介绍RabbitMQ中,如何来实现高可用。
架构图
服务器
为了保证RabbitMQ服务器的可用性,线上环境一般都使用镜像集群,当集群中某些节点不可用时,集群还是可以工作的。
生产者
1.交换器持久化
正常的业务交换器定义好后一般都会一直使用,即使服务器重启也不会消失。
#在定义交换器时(一般在消费者处),控制如下2个参数
durable:true
auto_delete:false
2.队列持久化
正常的业务队列定义好后一般都会一直使用,即使服务器重启也不会消失。
#在定义队列时(一般在消费者处),控制如下2个参数
durable:true
auto_delete:false
3.消息持久化
为了确保消息在服务器出问题的时候也不会丢失,需要将消息持久到磁盘。
消息真正的持久化实际需要依赖以下几点:
- 消息投递模式设置为2(持久)
- 消息发送到了持久化交换器
- 消息最终到达持久化队列
#在定义消息时增加属性
properties:['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
4.备用交换器
为了确保能跟踪到所有发送给交换器的消息,在我们预想接收的消息外,可以设置备用交换器,来捕获其他未被路由的消息。
//备用交换器参数
$arrArgument = new Wire\AMQPTable([
'alternate-exchange' => $strAeExchangeName
]);
//创建交换器,增加备用交换器设置
$this->getChannel()->exchange_declare($strExchangeName, $strExchangeType, false, true, false, false, false, $arrArgument);
5.生产者确认
为了确保生产者能成功的将消息推送给RabbitMQ服务器,需要使用生产者确认。
//开启信道确认模式
$this->getChannel()->confirm_select();
//设置信道回调方法
$this->getChannel()->set_ack_handler(function(AMQPMessage $objMessage) {
$this->ackHandler($objMessage);
});
$this->getChannel()->set_nack_handler(function(AMQPMessage $objMessage) {
$this->nackHandler($objMessage);
});
6.代码示例
消费者
1.消费者不下线
在大部分业务中,消费者都是启动之后就不停止的,但是如果RabbitMQ服务器异常导致连接不上,就不能正常消费队列中的消息,这时消费者需要能够自动切换其他可连接的服务器。
while (1) {
try {
while (1) {
$this->getChannel()->wait();
}
} catch (\Exception $e) {
//日志记录
//重建
$this->reset();
if (!$this->build($this->arrInitParam)) {
//日志记录
break;
}
}
}
2.消费者正常下线
当消费者逻辑需要进行更新时,就需要停止消费者的运行,如果直接将守护进程关闭,可能会导致逻辑处理到一半被终止了,从而产生不可预知的问题。
可在消费者的回调方法中,增加对标识键的判断,如果存在标识键则处理完此条消息后停止消费消息,同时关闭信道与连接。
/**
* 消费者是否已停止
*/
private function isConsumerStop() {
$this->blnConsumerStop = Cache::exec('exists', $this->arrInitParam['redis_key']);
return $this->blnConsumerStop;
}
/**
* 信息处理
*/
private function dealMessage(AMQPMessage $objMessage) {
//消息消费失败是否重进队列
$blnIsRequeue = isset($this->arrInitParam['is_requeue']) ? $this->arrInitParam['is_requeue'] : false;
//业务确认是否成功
$blnAck = $this->receiveMessage($objMessage->body);
if ($blnAck) {
$objMessage->delivery_info['channel']->basic_ack($objMessage->delivery_info['delivery_tag']);
} else {
//$objMessage->delivery_info['channel']->basic_nack($objMessage->delivery_info['delivery_tag'], false, $blnReQueue);
$objMessage->delivery_info['channel']->basic_reject($objMessage->delivery_info['delivery_tag'], $blnIsRequeue);
}
//消费者是否需要停止
if ($this->isConsumerStop()) {
$objMessage->delivery_info['channel']->basic_cancel($objMessage->delivery_info['consumer_tag']);
}
}
/**
* 开始运行
*/
public function run() {
while (1) {
try {
while (1) {
if ($this->blnConsumerStop) {
//消费者已停止
$this->reset();
throw new Exception('consumer stop');
} else {
$this->getChannel()->wait();
}
}
} catch (Exception $e) {
//消费者已停止
if ($this->blnConsumerStop) {
break;
}
//设置错误信息
$strErrorMsg = $e->getMessage();
$strErrorMsg = sprintf("type:%s\r\n param:%s\r\n error:%s\r\n", $this->getType() . '_run', json_encode($this->arrInitParam), $strErrorMsg);
//错误日志记录
Log::log($strErrorMsg, Config::get('const.Log.LOG_MQERR'));
//重建
$this->reset();
if (!$this->build($this->arrInitParam)) {
//日志记录
break;
}
}
}
}
3.消费者逻辑变更
标记值变化,处理完消息后就不处理,杀死进程
4.单条信息获取
为了确保消费者在消费消息时能够进行确认成功消费,每次只能队列中获取一条消息。
//每次只接受一条信息
$this->getChannel()->basic_qos(null, 1, null);
5.死信队列
当消费者在消费某个消息失败后,如果将消息重新投入队列,则此消息会被其它消费者依次接收到,如果一直不能成功消费,则会阻碍其他消息的消费。
可以将消费失败的消息,投入到死信队列,通过其他逻辑对它们进行处理,从而不影响正常的功能。
//死信交换器参数
$arrArgument = new Wire\AMQPTable([
'x-dead-letter-exchange' => $strDqExchangeName,
'x-dead-letter-routing-key' => $strDqRouteKey
]);
//创建队列,增加死信队列设置
$this->getChannel()->queue_declare($strQueueName, false, true, false, false, false, $arrArgument);
6.消费者确认
为了确保消费者成功的消费的消息,从而从队列中删除此消息,需要使用消费者确认模式。
//接收消息
$this->getChannel()->basic_consume($arrQueue['queue_name'], '', false, false, false, false, function(AMQPMessage $objMessage) {
$this->dealMessage($objMessage);
});
//业务确认是否成功
$blnAck = $this->receiveMessage($objMessage->body);
if ($blnAck) {
$objMessage->delivery_info['channel']->basic_ack($objMessage->delivery_info['delivery_tag']);
} else {
$objMessage->delivery_info['channel']->basic_reject($objMessage->delivery_info['delivery_tag'], $blnIsRequeue);
}
7.代码示例
性能
硬件配置
主要包括网络配置、磁盘管理、处理器核心数等。
软件配置
通过不同AMQP参数的配置,来合理控制性能。
消息持久化
在消息投递时,如果设置消息持久化存储的话,性能大致会降低1/2左右(降低后投递速度也有4000/s左右)。要根据实际使用场景来觉得是否需要使用持久化存储(是否能接受消息的丢失)。
消息确认
如果在订阅队列时设置no-ack=true
,服务器在将消息投递后就可以无须关注,消费者在处理完消息后也无须再发送确认信息给服务器,这样就可以加快消费者消费的速度。带来的问题是,如果消息消费失败,则消息会用永久丢失了。
路由算法和绑定规则
通常情况下,topic交换器上的绑定相比direct或fanout来说,会占用更多内存。
消息投递
参考如下消息流程图,RabbitMQ内部被优化为尽快的将消息投递给消费者。在制定容量规划与消息进出速率的时,应该尽可能让队列保持为空的状态,这样消息可以不经过内存或磁盘存储直接到消费者,从而提高效率。
Tips:需要时刻关注队列中消息数,提高消费者处理速度,避免消息大批量滞留。