运行环境centos7.6 lnmp
1,安装redis扩展和安装topthink/think-queue(fastadmin自带)
2,新增配置文件application/extra/queue.php
3,创建推送方法(往redis推送队列信息)
4,创建执行队列方法(往redis取出信息并处理:\application\index\job\Hello.php)
5,supervisor多进程处理(含有有些注意事项)
1,不展开细说

2,配置内容如下:

<?php
return [
    'connector'  => 'Redis',            // Redis 驱动
    'expire'     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',        // redis 主机ip
    'port'       => 6379,            // redis 端口
    'password'   => '',                // redis 密码
    'select'     => 0,                // 使用哪一个 db,默认为 db0
    'timeout'    => 0,                // redis连接的超时时间
    'persistent' => false,            // 是否是长连接    //    'connector' => 'Database',   // 数据库驱动
    //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    //    'default'   => 'default',    // 默认的队列名称
    //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
    //    'dsn'       => [],    //    'connector'   => 'Topthink',    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
    //    'token'       => '',
    //    'project_id'  => '',
    //    'protocol'    => 'https',
    //    'host'        => 'qns.topthink.com',
    //    'port'        => 443,
    //    'api_version' => 1,
    //    'max_retries' => 3,
    //    'default'     => 'default',    //    'connector'   => 'Sync',        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];

3.1,推送方法没有太多要求:主要是把信息推送到redis队列里面
本人使用fastadmin,实例:/application/admin/controller/Tbag.php(也可以放在api/command等其他地方)

<?php
namespace app\admin\controller;use app\index\job\Hello;
use think\Db;
use think\debug\Html;
use think\Request;
use think\Console;
use think\Queue;class Tbag extends \think\Controller
{
    /*
     * 测试队列action 
     * */
    public function index(){
        // 1.当前任务将由哪个类来负责处理。
        //$jobHandlerClassName  = 'application\index\job\Hello';//直接用 Hello::class 来替代即可
        // 2.队列名称
        $jobQueueName        = "helloJobQueue";
        // 3.业务逻辑
        $id = mt_rand(1000,9999);
		$jobData = [ 'something'=> 'xxoo', 'id' => $id ] ;
		// 4.推送进队列逻辑
		$isPushed = Queue::push( Hello::class , $jobData , $jobQueueName );//注释:Hello是取出redis队列数据后进行处理的类,在4会提到
	    if( $isPushed !== false ){
	         echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the Redis:".json_encode($jobData)."<br>";
	    }else{
	         echo 'Oops, something went wrong.';
	    }
    }
}

3.2推送完了,进行验证(如果是宝塔安装的redis直接输入redis-cli登录)
①登录redis②查看队列信息(LRANGE queues:helloJobQueue 0 -1)
代码如下:

[root@VM-0-7-centos ~]# redis-cli
127.0.0.1:6379> LRANGE queues:helloJobQueue 0 -1

4创建执行队列方法(注:方法不能随便取,要和3.1对应起来!!!)
代码文件:/application/index/job/Hello.php 代码如下:

<?php
/**
 * 文件路径: \application\index\job\Hello.php
 * 这是一个消费者类,用于处理 helloJobQueue 队列中的任务
 */
namespace app\index\job;use think\Db;
use think\queue\Job;class Hello {    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job,$data){
        // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }        $isJobDone = $this->doHelloJob($data);        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
            print("<info>Hello Job has been done and deleted"."</info>\n");
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                $job->delete();
            }
        }
    }    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }    /**
     * 根据消息中的数据进行实际的业务处理
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function doHelloJob($data) {
        // 根据消息中的数据进行实际的业务处理...
        if(isset($data['id']) && $data['id'] && isset($data['something'])){
            Db::table('xxoo')->where('id',$data['id'])->update(['something'=>$data['something']]);
        }
        sleep(8);
        return true;
    }
}

5运用supervisor多进程处理
5.1安装supervisor

yum install -y supervisor
cd /etc/supervisord.d/
vi upxxoo.ini

下面是upxxoo.ini配置信息

[program:uplikes] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /项目目录/think queue:listen --queue helloJobQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
process_name=%(process_num)02d
numprocs=2;进程数跟CPU核心数有关系
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwlogs/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwlogs/worker.log  ;输出日志文件

5.2启动supervisor

supervisord -c /etc/supervisord.conf

5.3查看状态,停止运行

supervisorctl status
supervisorctl shutdown

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信小程序

微信扫一扫体验

立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部