最新动态 > 详情

Hyperf rabbitmq 削峰平谷限流、死信队列配置方案

发布时间:2023-02-21 14:07:35

使用消息队列限流的主要配置在于rabbitmq队列长度的限制。

hyperf 的注解和方法没有提供直接配置队列参数的配置项,所以当我们写好限流消费者代码

和死信消费者代码,这两个消费者代码就是普通的hyperf消费者,我们将自动消费设置为false,以便测试。

 

 

 

public function isEnable(): bool{    return false; }

 

限流消费者代码正常写:

 

 

 

<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Result;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use PhpAmqpLib\Message\AMQPMessage;/** * 邮件服务限流150, 短信不限流 * @Consumer(exchange="test", routingKey="sms", queue="email", name="EmailConsumer", nums=1) */#[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "EmailConsumer", nums: 1)]class EmailConsumer extends ConsumerMessage{    public function isEnable() : bool    {        //默认情况下,使用了 @Consumer 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。 如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便.注解中配置 enable=false (默认为 true 跟随服务启动)或者在对应的消费者中重写类方法 isEnable() 返回 false 即可        return false;    }    public function consumeMessage($data, AMQPMessage $message): string    {        echo 'EmailConsumer:ACK'.PHP_EOL.microtime(true);//        var_dump($data);        return Result::ACK;    }}

 

<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Result;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use PhpAmqpLib\Message\AMQPMessage;/** * 死信队列 * @Consumer(exchange="dead.test", routingKey="*", queue="dead.sms.test", name="deadSmsConsumer", nums=1) */#[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "deadSmsConsumer", nums: 1)]class deadSmsConsumer extends ConsumerMessage{    public function isEnable(): bool    {        return false; // TODO: Change the autogenerated stub    }    public function consumeMessage($data, AMQPMessage $message): string    {        echo "deadSmsConsumer 获取死信!";        return Result::ACK;    }}

 

 

然后执行下面手动操作:

 

1.手动创建一个消息队列并配置响应的参数

注意配置x-ha-policy=all项,否则启动hyperf会出现以下报错:

“PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'email' in vhost '/': received none but current is the value 'dead.test' of type 'longstr'(406)”

 

2.绑定队列到对应的Exchange

 

3.启动hyperf,生产者生产消息

此时消息队列满了以后接受的信息会存入到死信队列

 

上一篇: docker redis集群配置3主3从hash槽分配

下一篇:windows IDEA开发工具连接linux服务器Docker,适用于Phpstorm配置,IntelliJ可作为参考。