RabbitMQ 中消息确认与返回机制概述
1. 前言
Hello,大家好。本小节会为大家介绍 RabbitMQ 中的消息确认机制与消息返回机制,这两个机制是针对消息来说的不同场景而诞生的,其目的就是为了保证 RabbitMQ 中消息的可靠性投递,以及消息在发生特定异常时的补偿策略。
这两种机制是 RabbitMQ 自带的补偿机制,在我们开发应用程序时,如果遇到了对应的业务场景,我们直接来用就可以了,就不需要去使用额外的工具来弥补了。
本节主要内容:
- 消息确认机制概述;
- 消息返回机制概述;
2. 消息确认机制概述
基础概念:
消息确认机制,是描述消息与 RabbitMQ Server 之间的关系的一种保障机制,其主要内容就是用来监听,当我们应用程序中的数据,即消息,被发送到 RabbitMQ Server 中之后返回给生产端的一种消息监听机制。消息确认机制描述了一种消息是否已经被发送到 RabbitMQ Server 中以及 RabbitMQ Server 与生产端之间的关系。
从上述消息确认机制的基本概念可以得出,消息确认机制的作用就是:监听生产端的消息是否已经发送到了 RabbitMQ Server 中,如果消息没有被发送到 RabbitMQ Server 中,则消息确认机制不会给生产端返回任何确认应答,相反,如果消息被成功发送到了 RabbitMQ Server 中,则消息确认机制会给生产端返回一个确认应答,以通知生产端,消息已经发送到了 RabbitMQ Server 中,概念图如下所示:
根据上图,消息在被成功发送到 RabbitMQ Server 中之后,RabbitMQ Server 就会给生产端返回一个确认应答,这个确认应答会包含两种结果,一种就是消息发送到了 RabbitMQ Server ,RabbitMQ Server 收到了该消息,这时会给生产者返回 ack 的确认应答, 表示消息已经被接收。
另一种就是消息没有发送到 RabbitMQ Server ,RabbitMQ Server 没有收到该消息,这时会给生产者返回一个 nack 的确认应答,即 no ack , 表示没有接收到该消息。
我们在了解了消息确认机制的基础概念和作用之后,我们还需要了解在 RabbitMQ 中,如何通过代码来实现 RabbitMQ 的消息确认机制。
代码实现:
实现消息确认机制,只需要在生产端进行配置即可,代码如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.confirmSelect();
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener(){
@Override
public void handleAck(long l, boolean b) throws IOException{
// do something...
}
@Override
public void handleNack(long l, boolean b) throws IOException{
// do something...
}
});
代码解释:
第 1-5 行,我们使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。
第 6 行,我们使用建立好的连接,来创建了一个频道 channel 。
第 7 行,我们使用 channel 的 basicPublish 方法来将我们的消息发送到 RabbitMQ Server 中。
第 8 行,我们为 channel 绑定了一个消息确认机制的监听器,即 addConfirmListener ,且我们通过 new ConfirmListener 匿名内部类的方式,来重写了消息确认监听器中的 handleAck 方法和 handleNack 方法,其中,handleAck 方法表示消息已经被 RabbitMQ Server 接收可以返回 ack 的确认应答,handleNack 方法则表示方法没有被 RabbitMQ Server 接收可以返回 nack 的确认应答。
Tips: 1. 配置消息确认机制我们需要先配置 confirmSelect 方法来声明消息确认机制,接着我们需要为 channel 添加 addConfirmListener 消息确认监听器,并重写其中的 handleAck 和 handleNack 方法,最后需要根据 RabbitMQ Server 返回的确认应答在上述两个方法中完成我们需要处理的业务逻辑;
2. 如果需要启用消息确认机制,那么我们就不能自主的去关闭频道 channel 和 连接 connection,因为消息确认机制的返回结果是异步返回的,如果我们在向 RabbitMQ Server 发送了消息之后,就关闭了对应的 channel 和 connection ,那么我们就收不到任何消息确认的结果了。
3. 消息返回机制概述
基础概念:
消息返回机制,是描述不可达的消息与生产者之间的一种保障策略,其主要内容就是用来监听,RabbitMQ Server 中是否存在不可达的消息,并根据监听结果返回给生产端的一种监听机制,即消息返回机制描述了一种 RabbitMQ Server 中的不可达消息与生产端的关系。
从上述消息返回机制的基本概念可以得出,消息返回机制的作用就是:监听生产端发动到 RabbitMQ Server 中的消息是否可达,如果消息不可达,则返回一个信号通知生产端,相反,如果消息可达,则不会通知生产端任何信号,概念图如下所示:
根据上图,消息在被成功发送到 RabbitMQ Server 中之后,如果消息在经过当前配置的 exchangeName 或 routingKey 没有找到指定的交换机,或没有匹配到对应的消息队列,那么这个消息就被称为不可达的消息,如果此时配置了消息返回机制,那么此时 RabbitMQ Server 会返回给生产端一个信号,信号中包括消息不可达的原因,以及消息本身的内容。
我们在了解了消息返回机制的基础概念和作用之后,我们还需要了解在 RabbitMQ 中,如何通过代码来实现 RabbitMQ 的消息返回机制。
代码实现:
实现消息返回机制,也是只需要在生产端进行配置即可,代码如下:
// 省略客户端连接 RabbitMQ Server 的过程
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// do something...
);
}
});
channel.basicPublish(exchangeName, routingKey + "01", true, null, msg.getBytes());
代码解释:
第 1-2 行,我们创建了客户端连接 RabbitMQ Server 的连接,并且创建了一个 channel 。
第 3 行,我们为 channel 添加了 addReturnListener 消息返回的监听器,并且通过 new ReturnListener 匿名内部类的方式来重写 handleReturn 方法。
第 12 行,我们通过 channel 的 basicPublish 方法,将我们的消息发送出去,其中,在 basicPublish 方法中,我们只需要了解前三个参数即可:第一个参数表示交换机的名称,第二个参数表示路由 Key 的名称,第三个参数是 mandatory 属性,表示是否开启消息返回机制,如果这个属性被置为 false ,则消息返回监听器就不会生效。
Tips: 1. 要使用消息返回机制,就一定要配置 basicPublish 方法中的第三个参数的值为 true ,否则,即使添加了 addReturnListener 监听器,不可达的消息也不会被监听到;
2. 如果我们没有配置第三个参数的属性为 true ,那么,当 RabbitMQ Server 中存在不可达消息时,RabbitMQ 就会自动将该消息删除。
4. 小结
本小节为各位同学介绍了 RabbitMQ 中的消息确认机制,以及消息返回机制。从消息确认机制和消息返回机制的基础概念开始,到不同机制的代码实现结束,详细介绍了什么是消息确认机制,以及什么是消息返回机制,且通过不同机制的代码实现,分别阐述了如何通过代码来对两种机制进行配置。希望同学们可以完全理解两种机制的基础概念和实现方式。
消息确认机制和消息返回机会的基本内容就全部介绍完毕了,但是有一种现象同学们可以课下尝试一下,那就是当两种机制都进行配置之后,如果我们的消息不可达了,那么消息确认的监听器还会监听到吗?