我欲乘风九万里
梦的起点
心灵之声
珊瑚礁
编码之舞
梦中的微风
项目编码
聚合登录文档
建议
AI助手
消息队列之rabbitmq
#### 前期准备工作 rabbitmq,supervisor 安装完毕 ### 框架 webman作为演示,laravel、thinkphp是一样的流程 安装对应的包 ```php composer require php-amqplib/php-amqplib ``` #### 在config里增加rabbitmq的配置内容 ```php <?php return [ 'host' => getenv('rabbitmq_host','1.116.128.127'), //ip 'port' => getenv('rabbitmq_port','5672'), //端口号 'user' => 'XXX', //账户 'password' => 'XXX', //密码 'vhost' => '/', // 虚拟主机 'queue' => env('rabbitmq_queue','rq.msg'), // 队列的标识 'exchange' => env('rabbitmq_exchange','amq.direct'), // 交换机 ]; ``` #### 在config里面增加message.php ```php <?php ##消息处理配置 ##消息routing_key=>class return [ 'rr.msg.test' => \app\common\message\ProductBxhSync::class, ]; ``` #### 消息队列链接代码封装 ```php <?php namespace lib\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; class MqConnection extends AMQPStreamConnection { public static function __make(){ $conf = config('rabbitmq'); return new parent( $conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost'], false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, false, 0, 3.0 ); } } ``` #### 生产者代码封装 ```php <?php namespace lib\rabbitmq; use PhpAmqpLib\Message\AMQPMessage; class MqProductor { /** * 生产消息 * @param $key * @param $msg * @param $rags * @return void */ public function publish($key,$msg,$rags = []){ $channel = MqConnection::__make()->channel(); $AMQPMessage = new AMQPMessage(is_string($msg) ? $msg : json_encode($msg)); //设置队列属性 if(!empty($rags)){ foreach ($rags as $k =>$rag) { $AMQPMessage->set($k,$rag); } } $channel->basic_publish($AMQPMessage,config('rabbitmq.exchange'),$key); } } ``` #### 生产者示例代码 ```php public function index() { try{ $mq_message = [ "shop_id" => 111, "no" => 222, "type" => 3333, ]; return (new mqProductor())->publish('rr.msg.test',$mq_message); }catch (\Exception $e){ Log::error('消息队列发送失败'); } } ``` #### 安装命令行 ```php composer require webman/console ``` #### 创建消费者统一入口 MessageCommand.php ```php php webman make:command message:command ``` ```php <?php namespace app\command; use lib\exception\RedoException; use lib\rabbitmq\MessageHandle; use lib\rabbitmq\MqConnection; use PhpAmqpLib\Message\AMQPMessage; use support\Log; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Output\OutputInterface; class MessageCommand extends Command { protected static $defaultName = 'message:command'; protected static $defaultDescription = 'message command'; /** * @return void */ protected function configure() { $this->addArgument('is_task', InputArgument::OPTIONAL, 'Name description'); } /** * @param InputInterface $input * @param OutputInterface $output * @return int */ protected function execute(InputInterface $input, OutputInterface $output): int { try{ # 获取队列 $queue = config('rabbitmq.queue'); $is_task = $input->getArgument('is_task'); if (!empty($is_task)){ if($is_task == 'true'){ $queue = $queue.'.task'; }else{ $queue = $queue.'.'.$is_task.'.task'; } } # 输出 配置信息 $output->writeln('['.date('Y-m-d H:i:s').'] rabbitmq.queue_message running...'."\n\t".'Listening on '.$queue); # 消息队列配置 $conf = config('message'); $channel = MqConnection::__make()->channel(); //信道 # prefetch_size=0 每个消费者允许未确认的消息的最大字节数 prefetch_count=1 每个消费者最多可以同时处理 4 条未确认的消息 $channel->basic_qos(null,4,null); #声明一个消费者 当消息到达队列时,RabbitMQ 将调用回调函数,并传递消息、消息属性和消息信封。回调函数负责处理消息。 $channel->basic_consume($queue,'', false, false, false, false, function (AMQPMessage $message)use ($output,$channel,$conf){ $output->writeln(json_encode($message->delivery_info)); if (isset($conf[$message->delivery_info['routing_key']])){ $headers=[]; if ($message->has('application_headers')){ $headers=$message->get('application_headers')->getNativeData(); } if (isset($headers['x-death'])&&count($headers['x-death'])>4){ $output->writeln('this message has been republish over 4 times ,system dropped this message'); $channel->basic_ack($message->getDeliveryTag()); return; } if (is_string($conf[$message->delivery_info['routing_key']])){ try{ /** * @var MessageHandle $messageHandle */ $output->writeln($conf[$message->delivery_info['routing_key']]); $messageHandle=new $conf[$message->delivery_info['routing_key']]; $messageHandle->handle($output,$message->delivery_info['routing_key'],$message->getBody()); }catch (RedoException $redoException){ #消息重发 $channel->basic_publish($message, 'ttl_ex', $message->delivery_info['routing_key']); } catch (\Exception $e) { $errMsg = '[' . date('Y-m-d H:i:s') . '] got a unexpected exception:' . $e->getMessage( ) . $e->getFile() . ':' . $e->getLine() . ':' . json_encode( $e->getTrace() ) . ',system has dropped this message(' . $message->delivery_info['routing_key'] . '). message content:' . "\n\t" . $message->getBody( ); $output->writeln($errMsg); } } }else{ $output->writeln('got a message but has no handle,system has dropped this message:'.$message->delivery_info['routing_key']); } $channel->basic_ack($message->getDeliveryTag()); }); while ($channel->is_consuming()){ $channel->wait(); } }catch (\Exception $exception){ #消息重发 Log::info('mqerror',$exception->getMessage()); $output->writeln($exception->getMessage()); } return self::SUCCESS; } } ``` 然后supervisor配置执行命令
ysian
我雾化科所
1
聚合登录之创建应用篇
2024-07-30
644
0
2
php函数之match
2024-07-29
582
0
3
聚合登录之常见问题
2024-07-30
489
0
4
聚合登录之前后端示例代码
2024-07-30
483
0
5
聚合登录之获取登录链接
2024-07-30
422
0
6
服务器瘦身
2023-11-01
413
5
7
centos下升级openssl
2023-11-30
412
0
8
我的第一篇博客
2023-09-28
402
0
9
supervisor的安装与使用
2023-11-02
383
0
10
editor.md 代码块没有下拉框
2023-10-06
352
2
PHP [9]
linux [4]
sh [1]
闲谈 [1]
css [2]
redis [2]
Cluster集群 [1]
git [1]
centos [1]
webman [2]
工具 [1]
OAuth2.0 [1]
项目 [1]
初始化 [1]
thinkphp [1]
Nginx [1]
往事 [1]
Mysql [1]
支付宝 [2]
抖音 [2]
qq [2]
聚合 [1]
登录 [1]
聚合登录 [4]
cache [1]
html [1]
queue [1]
Workerman版本
4.1.15
Webman版本
1.5.9
PHP版本
8.3.6
MYSQL版本
8.2.24
操作系统
CENTOS