运行环境centos7.6 lnmp
1,安装redis扩展和安装topthink/think-queue(fastadmin自带)
2,新增配置文件application/extra/queue.php
3,创建推送方法(往redis推送队列信息)
4,创建执行队列方法(往redis取出信息并处理:\application\index\job\Hello.php)
5,supervisor多进程处理(含有有些注意事项)
1,不展开细说
2,配置内容如下:
<?php
return [
'connector' => 'Redis', // Redis 驱动
'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'host' => '127.0.0.1', // redis 主机ip
'port' => 6379, // redis 端口
'password' => '', // redis 密码
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接 // 'connector' => 'Database', // 数据库驱动
// 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
// 'default' => 'default', // 默认的队列名称
// 'table' => 'jobs', // 存储消息的表名,不带前缀
// 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default', // 'connector' => 'Sync', // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];
3.1,推送方法没有太多要求:主要是把信息推送到redis队列里面
本人使用fastadmin,实例:/application/admin/controller/Tbag.php(也可以放在api/command等其他地方)
<?php
namespace app\admin\controller;use app\index\job\Hello;
use think\Db;
use think\debug\Html;
use think\Request;
use think\Console;
use think\Queue;class Tbag extends \think\Controller
{
/*
* 测试队列action
* */
public function index(){
// 1.当前任务将由哪个类来负责处理。
//$jobHandlerClassName = 'application\index\job\Hello';//直接用 Hello::class 来替代即可
// 2.队列名称
$jobQueueName = "helloJobQueue";
// 3.业务逻辑
$id = mt_rand(1000,9999);
$jobData = [ 'something'=> 'xxoo', 'id' => $id ] ;
// 4.推送进队列逻辑
$isPushed = Queue::push( Hello::class , $jobData , $jobQueueName );//注释:Hello是取出redis队列数据后进行处理的类,在4会提到
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the Redis:".json_encode($jobData)."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
3.2推送完了,进行验证(如果是宝塔安装的redis直接输入redis-cli登录)
①登录redis②查看队列信息(LRANGE queues:helloJobQueue 0 -1)
代码如下:
[root@VM-0-7-centos ~]# redis-cli
127.0.0.1:6379> LRANGE queues:helloJobQueue 0 -1
4创建执行队列方法(注:方法不能随便取,要和3.1对应起来!!!)
代码文件:/application/index/job/Hello.php 代码如下:
<?php
/**
* 文件路径: \application\index\job\Hello.php
* 这是一个消费者类,用于处理 helloJobQueue 队列中的任务
*/
namespace app\index\job;use think\Db;
use think\queue\Job;class Hello { /**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job,$data){
// 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
} $isJobDone = $this->doHelloJob($data); if ($isJobDone) {
//如果任务执行成功, 记得删除任务
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
}
}
} /**
* 有些消息在到达消费者时,可能已经不再需要执行了
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
} /**
* 根据消息中的数据进行实际的业务处理
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function doHelloJob($data) {
// 根据消息中的数据进行实际的业务处理...
if(isset($data['id']) && $data['id'] && isset($data['something'])){
Db::table('xxoo')->where('id',$data['id'])->update(['something'=>$data['something']]);
}
sleep(8);
return true;
}
}
5运用supervisor多进程处理
5.1安装supervisor
yum install -y supervisor
cd /etc/supervisord.d/
vi upxxoo.ini
下面是upxxoo.ini配置信息
[program:uplikes] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /项目目录/think queue:listen --queue helloJobQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
process_name=%(process_num)02d
numprocs=2;进程数跟CPU核心数有关系
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwlogs/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwlogs/worker.log ;输出日志文件
5.2启动supervisor
supervisord -c /etc/supervisord.conf
5.3查看状态,停止运行
supervisorctl status
supervisorctl shutdown
发表评论 取消回复