最新动态 > 详情

RabbitMq发布订阅demo

发布时间:2021-01-22 17:54:18

在thinkcmf系统下做了一个插件,完成几个功能,这里主要说非thinkcmf插件形式的发布和订阅,至于插件,优化后将上传thinkcmf插件市场。

一、接口的形式发布消息

二、接口的形式读取一条队列消息

三、封装一个Rabbitmq抽象类,方便消费者直接写消费端逻辑

封装一个RabbitMq类RabbitMq.php

 

 

 


 
<?php
/**
* 自定义RabbitMq抽象类
*/
abstract class RabbitMq{
public $config = array(
'host'=>'192.168.3.216', //host
'port'=> 5672, //端口
'login'=>'admin', //账号
'password'=>'admin', //密码
'vhost'=>'/' //虚拟主机
);
public $conn;
public $channel;
public $exchange;
public $queue;
public $exchangeName = ''; //交换机
public $queueName = ''; //队列名
public $routeKey = ''; //路由键
public $exchangeType = ''; //交换机类型
public function __construct($config=array(),$exchangeName,$routeKey,$queueName,$exchangeType)
{
$this->exchangeName = $exchangeName;
$this->queueName = $queueName;
$this->routeKey = $routeKey;
$this->exchangeType = $exchangeType;
if(empty($config)){
$this->connection($this->config);
}else{
if(!is_array($config)){
throw new Exception('config非数组');
}
foreach($config as $key => $val){
$this->config[$key] = $val;
}
$this->connection($this->config);
}
}
/**
* 创建连接与信道
*/
public function connection()
{
$this->conn = new \AMQPConnection($this->config);
if (!$this->conn->connect()) {
die("Cannot connect to the broker!\n");
}
$this->channel = new \AMQPChannel($this->conn);
//创建通道成功后创建交换机
$this->setExchange();
}
/**
* 创建交换机
*/
public function setExchange()
{
$this->exchange = new \AMQPExchange($this->channel);
$this->exchange->setName($this->exchangeName);
$this->exchange->setType($this->exchangeType);
$this->exchange->setFlags(AMQP_DURABLE);//持久化
}
/**
* 创建队列并绑定到交换机路由
*/
public function setQueue()
{
$this->queue = new \AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);//持久化
$this->queue->bind($this->exchangeName,$this->routeKey);
}
/**
* 发布
*/
public function publish($msg)
{
$this->exchange->publish($msg,$this->routeKey, AMQP_NOPARAM, array('delivery_mode' => 2));
}
/**
  * 消费回调函数
  * 处理消息
  * *需要注意的地方是:queue对象有两个方法可用于取消息: consume 和 get 。
  *前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,
  *取消息时有则取,无则返回false。
*/
public function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag());//手动应答
return $msg;
}
/**
* 订阅消费
*/
public function subscribe()
{
$this->setQueue();
$msg = $this->queue->get();
if(!empty($msg)){
// echo $msg;
$this->queue->ack($msg->getDeliveryTag());//手动应答
return $msg->getBody();
}
}
}

 

 

编辑一个消费者,订阅队列中的消息

 

 

 


 
<?php
/**
* 订阅消息模板
*/
require_once('../RabbitMq.php');
class Consumer extends RabbitMq{
public function __construct()
{
$config = array(
'host'=>'192.168.3.216', //host
'port'=> 5672, //端口
'login'=>'admin', //账号
'password'=>'admin', //密码
'vhost'=>'/' //虚拟主机
);
$exchangeName = 'e_linvo1';
$routerKey = 'key_1'; //路由键
$queueName = 'q_ttt';
parent::__construct($config,$exchangeName,$routerKey,$queueName,AMQP_EX_TYPE_DIRECT);
}
}
$consumer = new Consumer();
//监听信道,订阅信息
while(true){
$msg = $consumer->subscribe();
//TODO
if(!empty($msg)){
echo $msg."\n";
}
}

 

 

我们来测试一下

1.查看连接:我们打开rabbitmq后台

此时没有连接

我们执行一下创建的消费者

再次查看rabbitmq后台Connections

此时消费者端已经监听了一个队列,信息如下:

$exchangeName = 'e_linvo1';//交换机
$routerKey = 'key_1'; //路由键
$queueName = 'q_ttt';

 

2.发布和消费消息

我们使用插件接口发布一条信息“helloRabbit”:

查看终端输出:

 

3.查看队列消息

RabbitMq后台新增一个队列,但是由于消息被消费了,所以q_ttt消息为0

 

我们终止终端订阅者,再次使用接口发布一条消息 “helloRabbit2"

因为没有消费掉,所以后台显示一条Message

再次终端机PHP命令执行消费者文件

 

上一篇: SpringBoot 快速入门搭建项目流程--单元测试和打包

下一篇:VSCode终端集成git-bash,实现cmd和bash切换