php rabbitmq的开发体验(二)

一、前言

在上一篇rabbitmq开发体验,我们大致介绍和安装上了rabbitmq和php扩展和界面操作rabbitmq的方法,下面正是正式的用我们php来操作消息队列的生产和消费。附上参考的网站:

  • rabbitmq官网 https://www.rabbitmq.com/ 全英文看起来吃力 官方入门文档:https://www.rabbitmq.com/getstarted.html
  • 官方入门文档 https://www.cnblogs.com/grimm/p/5728736.html 此博客主对官方php案例的解释
  • RabbitMQ发布订阅实战-实现延时重试队列 代码项目:https://github.com/mylxsw/rabbitmq-pubsub-php
  • rabbitmq高级特性 B站视频 https://www.bilibili.com/video/BV1S5411H7ef?from=search&seid=8055368004001009131

二、开发经历

对于rabbitmq的php类库,我开发是使用PHP amqplib,composer解决依赖管理。

添加composer.json:

{
    "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接运行包引入 composer require php-amqplib/php-amqplib

我的php开发框架是yii1.1,代码如下

1.rabbitmq的连接底层类
<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * rabbitmq工具类
 */
class RabbitMq
{
    protected $connection;
    protected $channel;
    protected $exchange_name;
    protected $query_name;
    protected $route_key_name;

    /**
     * 构造器
     */
    public function __construct()
    {
        //读取文件会导致并发性高时连接失败故写在配置文件
        $config = Yii::app()->params['rabbitmq_config'];

        if (!$config)
            throw new \AMQPConnectionException('config error!');

        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password'], $config['vhost']);
        if (!$this->connection) {
            throw \AMQPConnectionException("Cannot connect to the broker!\n");
        }
     //链接后的信道,生产方和消费方都需要,消息的通道
$this->channel = $this->connection->channel(); } /** * close link */ public function close() { $this->channel->close(); $this->connection->close(); } /** * RabbitMQ destructor */ public function __destruct() { $this->close(); } }

2.消息的封装类

<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SubMessage
{
    private $message;
    private $routingKey;
    private $params;

    /**
     * SubMessage constructor.
     *
     * @param AMQPMessage $message
     * @param string      $routingKey
     * @param array       $params
     */
    public function __construct(AMQPMessage $message, $routingKey, $params = [])
    {
        $this->params = $params; //额外的参数这里主要存储重试的次数
        $this->message = $message;
        $this->routingKey = $routingKey;
    }

    /**
     * Get AMQP Message
     *
     * @return AMQPMessage
     */
    public function getAMQPMessage()
    {
        return $this->message;
    }

    /**
     * Get original Message
     *
     * @return Message
     */
    public function getMessage()
    {
        return $this->message->body;
    }

    /**
     * Get meta params
     *
     * @return array
     */
    public function getParams()
    {
        return is_array($this->params) ? $this->params : [];
    }

    /**
     * Get meta param
     *
     * @param string $key
     *
     * @return mixed|null
     */
    public function getParam(string $key)
    {
        return isset($this->params[$key]) ? $this->params[$key] : null;
    }

    /**
     * Get routing key
     *
     * @return string
     */
    public function getRoutingKey()
    {
        return $this->routingKey;
    }


}
3.消息的核心类
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
/**
 * vm独立站推送
 */
class VmMq extends RabbitMq
{
    protected $exchange_name = 'master'; //主Exchange,发布消息时发布到该Exchange
    protected $exchange_retry_name = 'master.retry'; //重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange
    protected $exchange_failed_name = 'master.failed'; //失败Exchange,超过三次重试失败后,消息投递到该Exchange
    protected $query_name = 'query_vm'; //消费服务需要declare三个队列[queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识
    protected $query_retry_name = 'query_vm@retry';
    protected $query_failed_name = 'query_vm@fail';
    protected $route_key_name = 'route_key_vm';
    /**
     * 构造器
     */
    public function __construct()
    {
        parent::__construct();

       
        $this->channel->basic_qos(null, 10, false);
        //声明topic类型交换器
        //指定交换机持久第四个参数设置为true:
        passive如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
        $this->channel->exchange_declare($this->exchange_name, 'topic', false, true, false);
        $this->channel->exchange_declare($this->exchange_retry_name, 'topic', false, true, false);
        $this->channel->exchange_declare($this->exchange_failed_name, 'topic', false, true, false);
    }

    /**
     * 生产消息
     */
    public function product($data){

        $unique_messageId = $this->create_guid(); //生成消息的唯一标识,用来幂等性
        if(!is_array($data)){
            $data = array('msg' => $data);
        }
        $data['unique_messageId'] = $unique_messageId;
        $data = json_encode($data,JSON_UNESCAPED_UNICODE);

        //存入到表中,保证生产的消息100%到mq队列
        $newModel = DynamicAR::model('nt_vm_message_idempotent');
        $newModel->message_id = $unique_messageId;
        $newModel->message_content = $data;
        $newModel->product_status = 0;
        $newModel->consume_status = 0;
        $newModel->create_time = $newModel->update_time = time();
        $newModel->isNewRecord = true;
        if(!$newModel->save()){
            file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '数据库保存失败' . $data .PHP_EOL, FILE_APPEND);
        }
        //推送成功的ack回调
        $this->channel->set_ack_handler(
            function(AMQPMessage $msg){
                $msgBody = json_decode($msg->getBody(),true);
                if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){
                    file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '获取消费ID为空!' . $msg->getBody().PHP_EOL, FILE_APPEND);
                    return;
                }
                $unique_messageId = $msgBody['unique_messageId'];
                $criteria = new CDbCriteria;
                $criteria->addCondition("message_id = '".$unique_messageId."'");
                $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria);
                if (!$messageIdempotent) {
                    file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '该消息数据库里不存在' . $msg->getBody().PHP_EOL, FILE_APPEND);
                    return;
                }else{
                    $connection = Yii::app()->db;
                    $command = $connection->createCommand("
                        UPDATE nt_vm_message_idempotent SET product_status=1 WHERE message_id = '$unique_messageId'
                    ");
                    $re = $command->execute();
                    if($re) {
//                        file_put_contents(ROOT_PATH.'runtime/vm_product_log.log', $messageIdempotent->message_id . $msg->getBody() .PHP_EOL, FILE_APPEND);
                        return;
                    }else{
                        file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '数据库保存失败' . $msg->getBody() .PHP_EOL, FILE_APPEND);
                        return;
                    }
                }
            }
        );
        //推送失败的nack回调
        $this->channel->set_nack_handler(
            function(AMQPMessage $message){
                file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', "消息生产到mq nack ".$message->body.PHP_EOL, FILE_APPEND);
            }
        );
        //监听交换机或者路由键是否存在
        $returnListener = function (
            $replyCode,
            $replyText,
            $exchange,
            $routingKey,
            $message
        ) {
            file_put_contents(ROOT_PATH.'runtime/vm.log', 'replyCode ='.$replyCode.';replyText='.$replyText.';exchange='.$exchange.';routingKey='.$routingKey.';body='.$message->body.PHP_EOL, FILE_APPEND);
        };
        //开启发送消息的return机制
        $this->channel->set_return_listener($returnListener);
        //开启发送消息的ack回调
        $this->channel->confirm_select();

        //设置消息持久化。AMQPMessage的属性delivery_mode =2
        $msg = new AMQPMessage($data,array(
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ));
        $msg->set('application_headers', new AMQPTable([]));
        $this->channel->basic_publish($msg, $this->exchange_name,$this->query_name,true);
        $this->channel->wait_for_pending_acks();
        $this->close();
    }

    protected function create_guid($namespace = '') {
        static $guid = '';
        $uid = uniqid("", true);
        $data = $namespace;
        $data .= $_SERVER['REQUEST_TIME'];
        $data .= $_SERVER['HTTP_USER_AGENT'];
        $data .= $_SERVER['SERVER_ADDR'];
        $data .= $_SERVER['SERVER_PORT'];
        $data .= $_SERVER['REMOTE_ADDR'];
        $data .= $_SERVER['REMOTE_PORT'];
        $hash = strtoupper(hash('ripemd128', $uid . $guid . md5($data)));
        $guid = '{' .
            substr($hash, 0, 8) .
            '-' .
            substr($hash, 8, 4) .
            '-' .
            substr($hash, 12, 4) .
            '-' .
            substr($hash, 16, 4) .
            '-' .
            substr($hash, 20, 12) .
            '}';
        return $guid;
    }


    /**
     * 消费消息
     */
    public function consume(\Closure $callback,\Closure $shouldExitCallback = null){
        $this->declareRetryQueue();
        $this->declareConsumeQueue();
        $this->declareFailedQueue();

        $queueName = $this->query_name;
        $exchangeRetryName = $this->exchange_retry_name;
        $exchangeFailedName = $this->exchange_failed_name;
        // 发起延时重试
        $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) {

            /** @var AMQPTable $headers */
            if ($msg->has('application_headers')) {
                $headers = $msg->get('application_headers');
            } else {
                $headers = new AMQPTable();
            }

            $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg));

            $properties = $msg->get_properties();
            $properties['application_headers'] = $headers;
            $newMsg = new AMQPMessage($msg->getBody(), $properties);

            $this->channel->basic_publish(
                $newMsg,
                $exchangeRetryName,
                $queueName
            );
            //发送ack信息应答当前消息处理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        // 将消息发送到失败队列
        $publishFailed = function ($msg) use ($queueName,$exchangeFailedName) {
            $this->channel->basic_publish(
                $msg,
                $exchangeFailedName,
                $queueName
            );
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $this->channel->basic_consume(
            $this->query_name,
            '',     //customer_tag
            false,  //no_local
            false,  //no_ack
            false,   //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
            false,  //nowait
            function(AMQPMessage $msg) use ($callback, $publishRetry, $publishFailed) {
                /*
                 * 需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,
                 * 说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,
                 * 并且将消息记录进行入库。
                 */
                $msgBody = json_decode($msg->getBody(),true);
                if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){
                    file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '获取消费ID为空!' . $msg->getBody().PHP_EOL, FILE_APPEND);
                    return;
                }
                $unique_messageId = $msgBody['unique_messageId'];
                $criteria = new CDbCriteria;
                $criteria->addCondition("message_id = '".$unique_messageId."' and consume_status = 0 ");
                $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria);
                //如果找不到,则进行消费此消息
                if ($messageIdempotent) {
                    $callback($msg, $publishRetry, $publishFailed);
                } else {
                    //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
                    file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '该消息已消费,无须重复消费!' . $msg->getBody().PHP_EOL, FILE_APPEND);
                    return;
                }
            }
        );
        while (count($this->channel->callbacks)) {

            if ($shouldExitCallback()) {
                return;
            }

            try {
                $this->channel->wait();
            } catch (AMQPTimeoutException $e) {
            } catch (AMQPIOWaitException $e) {
            }
        }


        $this->close();
    }


    /**
     * 重试失败的消息
     * 注意: 该方法会堵塞执行
     * @param \Closure $callback 回调函数,可以为空,返回true则重新发布,false则丢弃
     */
    public function retryFailed($callback = null)
    {
        $this->declareConsumeQueue();
        $this->declareFailedQueue();

        $queueName = $this->query_name;
        $exchangeName = $this->exchange_name;
        $this->channel->basic_consume(
            $this->query_failed_name,
            '',     //customer_tag
            false,  //no_local
            false,  //no_ack
            true,   //exclusive
            false,  //nowait
            function ($msg) use ($queueName, $exchangeName, $callback) {
                if (is_null($callback) || $callback($msg)) {
                    // 重置header中的x-death属性
                    $msg->set('application_headers', new AMQPTable([
                        'x-orig-routing-key' => $this->getOrigRoutingKey($msg),
                    ]));
                    $this->channel->basic_publish(
                        $msg,
                        $exchangeName,
                        $queueName
                    );
                }

                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        );
        while (count($this->channel->callbacks)) {
            try {
                $this->channel->wait();
            } catch (AMQPTimeoutException $e) {
                return;
            } catch (AMQPIOWaitException $e) {
            }
        }
    }

    /**
     * 获取重发的次数
     */
    protected function getOrigRoutingKey($msg){
        $retry = 0;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-death'][0]['count'])) {
                $retry = $headers['x-death'][0]['count'];
            }
        }

        return (int)$retry;
    }

    /**
     * 声明重试队列
     */
    private function declareRetryQueue()
    {
        $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array(
            'x-dead-letter-exchange' => $this->exchange_name,
            'x-dead-letter-routing-key' => $this->query_name,
            'x-message-ttl'          => 30 * 1000,
        )));
        $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name);
    }

    /**
     * 声明消费队列
     */
    private function declareConsumeQueue()
    {
        //声明队列
        $this->channel->queue_declare(
            $this->query_name, //队列名称
            false,  //passive
            true,   //durable
            false,   //exclusive
            false,  //auto_delete
            false   //nowait
        );
        //绑定交换机和队列
        $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->route_key_name);
        $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->query_name);
    }

    /**
     * 声明消费失败队列
     */
    private function declareFailedQueue()
    {
        $this->channel->queue_declare($this->query_failed_name, false, true, false, false, false);
        $this->channel->queue_bind($this->query_failed_name, $this->exchange_failed_name, $this->query_name);
    }
}
 

我们将会实现如下功能

  • 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
  • 结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现消息的延时重试功能
  • 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费

具体流程见下图

  1. 生产者发布消息到主Exchange
  2. 主Exchange根据Routing Key将消息分发到对应的消息队列
  3. 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费
  4. 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
  5. 如果重试次数小于设定的最大重试次数(3次),则将消息重新投递到Retry Exchange的重试队列
  6. 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
  7. 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
  8. 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了

外部确认消息表结构

CREATE TABLE `nt_vm_message_idempotent` (
  `message_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT ” COMMENT ‘消息ID’,
  `message_content` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT ” COMMENT ‘消息内容’,
  `product_status` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT ‘是否生产成功到mq’,
  `consume_status` tinyint(1) NOT NULL COMMENT ‘是否消费成功’,
  `create_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `update_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
  `retry_time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT ‘重新发送次数’,
  PRIMARY KEY (`message_id`) USING BTREE,
  UNIQUE INDEX `unique_message_id`(`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

4.消息的消费脚本
<?php

include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Created by PhpStorm.
 * User: tangkeji
 * Date: 21-4-26
 * Time: 下午3:31
 */

class VmMqCommand extends CConsoleCommand {

    private function _Output($data, $isEnd = 0) {
        if (is_array($data) || is_object($data)) {
            var_dump($data);
            echo "\n";
        } else {
            echo $data . "\n";
        }

        if ($isEnd) {
            Yii::app()->end();
        }
    }


    /**
     * 消息进程
     */
    public function actionRun(){
        $stopped = false;
        $autoExitCounter = 200;

        $mq = new VmMq();
        $callback = function (AMQPMessage $msg, $publishRetry, $publishFailed) use (
            &$autoExitCounter
        ) {
            $retry = $this->getRetryCount($msg);

            try {
                $routingKey = $this->getOrigRoutingKey($msg);
                $subMessage = new SubMessage($msg, $routingKey , [
                    'retry_count' => $retry, // 重试次数
                ]);

                $this->subscribe($subMessage);

                // 发送确认消息
                $msg->delivery_info['channel']->basic_ack(
                    $msg->delivery_info['delivery_tag']
                );

            } catch (\Exception $ex) {
if ($retry > 3) { // 超过最大重试次数,消息无法处理 $publishFailed($msg); return; } // 消息处理失败,稍后重试 $publishRetry($msg); } }; $mq->consume( $callback, function () use (&$stopped, &$autoExitCounter) { return $stopped || $autoExitCounter < 1; } ); } /** * 获取消息重试次数 * * @param AMQPMessage $msg * * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; } /** * 订阅消息处理 * * @param \Aicode\RabbitMQ\SubMessage $msg * * @return bool 处理成功返回true(返回true后将会对消息进行处理确认),失败throw 异常 */ public function subscribe($msg) { // TODO 业务逻辑实现      //throw new Exception("消费异常!!!"); //消费失败需要抛出异常来重新处理此消息 echo sprintf( "subscriber:<%s> %s\n", $msg->getRoutingKey(), $msg->getMessage() ); echo "----------------------------------------\n"; //存入到表中,标识该消息已消费 $msgBody = json_decode($msg->getMessage(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '获取消费ID为空!' . $msg->getMessage().PHP_EOL, FILE_APPEND); return true; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."' and consume_status = 0 "); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); //如果找不到,则进行消费此消息 if (!$messageIdempotent) { //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; file_put_contents(ROOT_PATH . 'runtime/vm_consume_failed.log', '该消息已消费,无须重复消费!' . $msg->getMessage() . PHP_EOL, FILE_APPEND); } else { $update_time = time(); $connection = Yii::app()->db; $command = $connection->createCommand(" UPDATE nt_vm_message_idempotent SET consume_status=1,update_time='$update_time' WHERE message_id = '$unique_messageId' "); $re = $command->execute(); if($re) { // file_put_contents(ROOT_PATH.'runtime/vm_consume_log.log', $messageIdempotent->message_id . $msg->getMessage() .PHP_EOL, FILE_APPEND); return; }else{ file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '数据库保存失败' . $msg->getMessage() .PHP_EOL, FILE_APPEND); return; } } return true; } private function getOrigRoutingKey(AMQPMessage $msg) { $retry = null; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-orig-routing-key'])) { $retry = $headers['x-orig-routing-key']; } } return $retry?$retry:$msg->get('routing_key'); } }
 

三、总结

以上是我的rabbitmq从0到有的经历,可能里面有不完美或者错误请大家指出,必会好好纠正,主要我这个消息要保证消息的可靠性,不容许丢失。里面用到rabbitmq的高级特性如ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。

 

原文地址:https://www.cnblogs.com/xia-na/p/14781203.html