RabbitMQ学习-高可用

hign available

Posted by zwtisme on May 19, 2018

介绍RabbitMQ中,如何来实现高可用。

架构图

image

服务器

为了保证RabbitMQ服务器的可用性,线上环境一般都使用镜像集群,当集群中某些节点不可用时,集群还是可以工作的。

镜像集群搭建方法

生产者

1.交换器持久化

正常的业务交换器定义好后一般都会一直使用,即使服务器重启也不会消失。

#在定义交换器时(一般在消费者处),控制如下2个参数
durable:true
auto_delete:false

2.队列持久化

正常的业务队列定义好后一般都会一直使用,即使服务器重启也不会消失。

#在定义队列时(一般在消费者处),控制如下2个参数
durable:true
auto_delete:false

3.消息持久化

为了确保消息在服务器出问题的时候也不会丢失,需要将消息持久到磁盘。

消息真正的持久化实际需要依赖以下几点:

  • 消息投递模式设置为2(持久)
  • 消息发送到了持久化交换器
  • 消息最终到达持久化队列
#在定义消息时增加属性
properties:['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]

4.备用交换器

官方文档

为了确保能跟踪到所有发送给交换器的消息,在我们预想接收的消息外,可以设置备用交换器,来捕获其他未被路由的消息。

//备用交换器参数
$arrArgument = new Wire\AMQPTable([
    'alternate-exchange' => $strAeExchangeName
]);
//创建交换器,增加备用交换器设置
$this->getChannel()->exchange_declare($strExchangeName, $strExchangeType, false, true, false, false, false, $arrArgument);

5.生产者确认

官方文档

为了确保生产者能成功的将消息推送给RabbitMQ服务器,需要使用生产者确认。

//开启信道确认模式
$this->getChannel()->confirm_select();
//设置信道回调方法
$this->getChannel()->set_ack_handler(function(AMQPMessage $objMessage) {
    $this->ackHandler($objMessage);
});
$this->getChannel()->set_nack_handler(function(AMQPMessage $objMessage) {
    $this->nackHandler($objMessage);
});

6.代码示例

生产者

消费者

1.消费者不下线

在大部分业务中,消费者都是启动之后就不停止的,但是如果RabbitMQ服务器异常导致连接不上,就不能正常消费队列中的消息,这时消费者需要能够自动切换其他可连接的服务器。

while (1) {
    try {
        while (1) {
            $this->getChannel()->wait();
        }
    } catch (\Exception $e) {
        //日志记录
        //重建
        $this->reset();
        if (!$this->build($this->arrInitParam)) {
            //日志记录
            break;
        }
    }
}

2.消费者正常下线

当消费者逻辑需要进行更新时,就需要停止消费者的运行,如果直接将守护进程关闭,可能会导致逻辑处理到一半被终止了,从而产生不可预知的问题。

可在消费者的回调方法中,增加对标识键的判断,如果存在标识键则处理完此条消息后停止消费消息,同时关闭信道与连接。

/**
 * 消费者是否已停止
 */
private function isConsumerStop() {
    $this->blnConsumerStop = Cache::exec('exists', $this->arrInitParam['redis_key']);
    return $this->blnConsumerStop;
}
/**
 * 信息处理
 */
private function dealMessage(AMQPMessage $objMessage) {
    //消息消费失败是否重进队列
    $blnIsRequeue = isset($this->arrInitParam['is_requeue']) ? $this->arrInitParam['is_requeue'] : false;
    //业务确认是否成功
    $blnAck = $this->receiveMessage($objMessage->body);
    if ($blnAck) {
        $objMessage->delivery_info['channel']->basic_ack($objMessage->delivery_info['delivery_tag']);
    } else {
        //$objMessage->delivery_info['channel']->basic_nack($objMessage->delivery_info['delivery_tag'], false, $blnReQueue);
        $objMessage->delivery_info['channel']->basic_reject($objMessage->delivery_info['delivery_tag'], $blnIsRequeue);
    }
    //消费者是否需要停止
    if ($this->isConsumerStop()) {
        $objMessage->delivery_info['channel']->basic_cancel($objMessage->delivery_info['consumer_tag']);
    }
}
/**
 * 开始运行
 */
public function run() {
    while (1) {
        try {
            while (1) {
                if ($this->blnConsumerStop) {
                    //消费者已停止
                    $this->reset();
                    throw new Exception('consumer stop');
                } else {
                    $this->getChannel()->wait();
                }
            }
        } catch (Exception $e) {
            //消费者已停止
            if ($this->blnConsumerStop) {
                break;
            }
            //设置错误信息
            $strErrorMsg = $e->getMessage();
            $strErrorMsg = sprintf("type:%s\r\n param:%s\r\n error:%s\r\n", $this->getType() . '_run', json_encode($this->arrInitParam), $strErrorMsg);
            //错误日志记录
            Log::log($strErrorMsg, Config::get('const.Log.LOG_MQERR'));
            //重建
            $this->reset();
            if (!$this->build($this->arrInitParam)) {
                //日志记录
                break;
            }
        }
    }
}

3.消费者逻辑变更

标记值变化,处理完消息后就不处理,杀死进程

4.单条信息获取

为了确保消费者在消费消息时能够进行确认成功消费,每次只能队列中获取一条消息。

//每次只接受一条信息
$this->getChannel()->basic_qos(null, 1, null);

5.死信队列

官方文档

当消费者在消费某个消息失败后,如果将消息重新投入队列,则此消息会被其它消费者依次接收到,如果一直不能成功消费,则会阻碍其他消息的消费。

可以将消费失败的消息,投入到死信队列,通过其他逻辑对它们进行处理,从而不影响正常的功能。

//死信交换器参数
$arrArgument = new Wire\AMQPTable([
    'x-dead-letter-exchange' => $strDqExchangeName,
    'x-dead-letter-routing-key' => $strDqRouteKey
]);
//创建队列,增加死信队列设置
$this->getChannel()->queue_declare($strQueueName, false, true, false, false, false, $arrArgument);

6.消费者确认

官方文档

为了确保消费者成功的消费的消息,从而从队列中删除此消息,需要使用消费者确认模式。

//接收消息
$this->getChannel()->basic_consume($arrQueue['queue_name'], '', false, false, false, false, function(AMQPMessage $objMessage) {
    $this->dealMessage($objMessage);
});
//业务确认是否成功
$blnAck = $this->receiveMessage($objMessage->body);
if ($blnAck) {
    $objMessage->delivery_info['channel']->basic_ack($objMessage->delivery_info['delivery_tag']);
} else {
    $objMessage->delivery_info['channel']->basic_reject($objMessage->delivery_info['delivery_tag'], $blnIsRequeue);
}

7.代码示例

消费者

性能

硬件配置

主要包括网络配置、磁盘管理、处理器核心数等。

软件配置

通过不同AMQP参数的配置,来合理控制性能。

消息持久化

在消息投递时,如果设置消息持久化存储的话,性能大致会降低1/2左右(降低后投递速度也有4000/s左右)。要根据实际使用场景来觉得是否需要使用持久化存储(是否能接受消息的丢失)。

消息确认

如果在订阅队列时设置no-ack=true,服务器在将消息投递后就可以无须关注,消费者在处理完消息后也无须再发送确认信息给服务器,这样就可以加快消费者消费的速度。带来的问题是,如果消息消费失败,则消息会用永久丢失了。

路由算法和绑定规则

通常情况下,topic交换器上的绑定相比direct或fanout来说,会占用更多内存。

消息投递

参考如下消息流程图,RabbitMQ内部被优化为尽快的将消息投递给消费者。在制定容量规划与消息进出速率的时,应该尽可能让队列保持为空的状态,这样消息可以不经过内存或磁盘存储直接到消费者,从而提高效率。

Tips:需要时刻关注队列中消息数,提高消费者处理速度,避免消息大批量滞留。

image