【RabbitMQ】RabbitMq消息丢失、重复消费以及消费顺序性的解决方案

技术 · 2024-11-14

一、消息丢失 (Message Loss)

消息丢失可能发生在三个阶段:生产者 -> BrokerBroker 内部Broker -> 消费者。我们需要在每个阶段都确保消息的可靠性。

1. 生产者 -> Broker 阶段

原因:
生产者发送消息后,可能因为网络抖动、Broker 宕机等原因,消息并未成功到达 Broker。而生产者对此并不知情,以为已经发送成功。

解决方案:
使用 RabbitMQ 提供的 Publisher Confirms (生产者确认) 机制。

  • 工作原理: 生产者将信道 (Channel) 设置为 confirm 模式。此后,生产者发出的每条消息都会被分配一个唯一的 ID。当消息被 Broker 正确接收后,Broker 会向生产者发送一个 Ack (Positive Acknowledgment)。如果消息因 Broker 内部问题(如队列满了)未能处理,Broker 会发送一个 Nack (Negative Acknowledgment)。
  • 实现方式:

    • 同步确认: 发送一条消息后,阻塞等待 Broker 的 Ack。这种方式简单,但吞吐量极低,不推荐在生产环境使用。
    • 异步确认: 主流的使用方式。生产者发送消息后不阻塞,而是提供一个回调函数。当收到 Broker 的 AckNack 时,触发回调。

      • 在回调中,如果是 Ack,则证明消息已成功发送。
      • 如果是 Nack,或长时间未收到 Ack (通过超时机制判断),则需要进行重试记录日志/告警

代码示例 (伪代码):

// 开启 confirm 模式
channel.confirmSelect();

// 准备一个容器来存储待确认的消息
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

// 添加异步确认监听器
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // Ack 回调
    if (multiple) {
        // 如果 multiple 为 true,表示小于等于 sequenceNumber 的消息都已确认
        outstandingConfirms.headMap(sequenceNumber, true).clear();
    } else {
        outstandingConfirms.remove(sequenceNumber);
    }
    System.out.println("消息 " + sequenceNumber + " 已确认");
}, (sequenceNumber, multiple) -> {
    // Nack 回调,需要重发或记录错误
    String messageBody = outstandingConfirms.get(sequenceNumber);
    System.err.println("消息 " + messageBody + " (ID: " + sequenceNumber + ") 未被确认,需要重试!");
    // ...重试逻辑...
});

// 发送消息
long nextSeqNo = channel.getNextPublishSeqNo();
outstandingConfirms.put(nextSeqNo, message);
channel.basicPublish(exchange, routingKey, props, message.getBytes());

2. Broker 内部阶段

原因:
消息已经到达 Broker,但 Broker 在将消息存入磁盘前突然宕机或重启,内存中的消息将会丢失。

解决方案:
持久化 (Durability)。为了确保消息在 Broker 重启后依然存在,需要对三样东西进行持久化设置:

  1. Exchange (交换机) 持久化: 在声明交换机时,设置 durable = true
  2. Queue (队列) 持久化: 在声明队列时,设置 durable = true
  3. Message (消息) 持久化: 在发送消息时,设置消息属性 delivery_mode = 2 (or MessageProperties.PERSISTENT_TEXT_PLAIN)。

注意: 只有当消息被发送到持久化的交换机,并路由到持久化的队列,同时消息本身也被标记为持久化时,消息才会在 Broker 重启后保留。三者缺一不可。

3. Broker -> 消费者阶段

原因:
消费者获取到消息后,还没来得及处理,就意外宕机了。默认情况下,Broker 在发送消息后会立即将其从队列中删除(auto-ack 模式),导致消息丢失。

解决方案:
使用 Consumer Acknowledgements (消费者确认) 机制。

  • 工作原理: 将消费模式设置为手动确认 (autoAck = false)。

    • 消费者成功处理完业务逻辑后,手动调用 channel.basicAck() 方法,通知 Broker 该消息已成功处理,可以从队列中删除了。
    • 如果消费者处理失败,可以调用 channel.basicNack()channel.basicReject(),并选择是否将消息重新入队 (requeue = true)。
  • 关键点: 务必在业务逻辑完全处理成功之后,再发送 ack

总结:如何保证消息不丢失?

生产者确认机制 + Broker端持久化 + 消费者手动ACK = 全链路可靠性保证。

二、重复消费 (Duplicate Consumption)

根本原因:
消息不丢失的保障机制(如网络重传、消费者未ack导致重投)本身就会引入重复消息的可能性。

  • 场景1 (生产者重试): 生产者发送消息后,因为网络问题,没收到 Broker 的 confirm。生产者会重发消息,但实际上第一条消息可能已经成功到达 Broker。这就造成了重复消息。
  • 场景2 (消费者重投): 消费者处理完消息,但在发送 ack 之前宕机了。Broker 没有收到 ack,会认为消费者没处理成功,于是将该消息重新投递给其他消费者。

解决方案:
消费端必须保证幂等性 (Idempotence)。

幂等性是指对于同一个操作,执行一次和执行多次的结果是完全相同的。既然无法完全避免重复消息,那么就应该让消费端的业务逻辑能够“优雅”地处理重复消息。

实现幂等性的常用方法:

  1. 唯一ID + 业务逻辑判断

    • 数据库唯一键约束: 在消息中包含一个全局唯一的业务ID(如订单ID、支付流水号)。在消费时,先去数据库查询这个ID是否存在,如果不存在,则插入数据并执行业务逻辑;如果已存在,则说明是重复消息,直接ack并忽略。利用数据库的唯一键约束可以有效防止重复插入。
    • 示例流程:

      BEGIN TRANSACTION;
      SELECT * FROM processed_messages WHERE message_id = ?;
      IF (result is empty) {
          // 核心业务逻辑
          INSERT INTO ...
          UPDATE ...
          
          // 记录已处理的消息ID
          INSERT INTO processed_messages (message_id) VALUES (?);
      }
      COMMIT;
      // 成功后发送 ack
      channel.basicAck(...);
  2. 使用分布式锁/缓存 (如 Redis)

    • 利用 Redis 的 SETNX (SET if Not eXists) 命令。消费时,使用消息的唯一ID作为 key。
    • 示例流程:

      String messageId = message.getUniqueId();
      // 尝试将 messageId 写入 Redis,如果成功(返回1),说明是第一次处理
      if (redis.setnx("mq_processed:" + messageId, "1")) {
          // 设置一个合理的过期时间,防止因消费者宕机导致锁无法释放
          redis.expire("mq_processed:" + messageId, 60); 
      
          // 执行核心业务逻辑...
          
          // 成功后发送 ack
          channel.basicAck(...);
      } else {
          // key 已存在,说明是重复消息,直接 ack 并忽略
          System.out.println("检测到重复消息:" + messageId);
          channel.basicAck(...);
      }

总结:如何解决重复消费?

核心思想是让消费逻辑具备幂等性。 RabbitMQ 本身不解决这个问题,需要由业务方在消费端来保证。

三、消费顺序性 (Consumption Order)

问题描述:
在某些业务场景下,需要保证消息被处理的顺序,例如一个订单的“创建”、“付款”、“完成”三个消息,必须按顺序处理。

默认情况下的问题:
默认情况下,RabbitMQ 只能保证消息在单个队列中是先进先出(FIFO)的。但是,如果你为了提高吞吐量而启动了多个消费者实例来消费同一个队列,那么顺序性就无法保证了。

  • 例如: 消息1和消息2先后进入队列。消费者A拿到了消息1,消费者B拿到了消息2。但可能B的处理速度比A快,先完成了业务逻辑,导致消息2先于消息1被处理。

解决方案:

  1. 单消费者消费单队列 (不推荐)

    • 最简单粗暴的方法:只为一个队列启动一个消费者。这样可以严格保证顺序,但吞吐量极低,失去了分布式的优势,且存在单点故障风险。
  2. 根据业务ID路由到特定队列 (推荐)

    • 这是解决顺序性问题的标准做法。核心思想是:将需要保证顺序的一组消息,始终发送到同一个队列中,并由同一个消费者来处理。
    • 实现步骤:

      1. 创建多个队列: 根据预估的并发量,创建多个队列,例如 order_queue_0, order_queue_1, ..., order_queue_N-1
      2. 使用 Direct 或 Topic Exchange: 设置一个交换机。
      3. 生产者计算路由键 (Routing Key): 生产者在发送消息时,根据需要保证顺序的业务标识(如 order_id)计算哈希值,然后对队列数量取模,得到一个固定的队列索引。

        String orderId = "12345";
        int queueIndex = orderId.hashCode() % N; // N是队列总数
        String routingKey = "order_routing_" + queueIndex;
      4. 绑定队列: 将每个队列通过对应的 routingKey 绑定到交换机上。

        • order_queue_0 绑定 order_routing_0
        • order_queue_1 绑定 order_routing_1
        • ...
      5. 为每个队列分配一个单线程消费者: 启动 N 个消费者,每个消费者只负责监听一个特定的队列。
    • 效果: 这样一来,所有 order_id 相同的消息都会被发送到同一个队列,并由同一个消费者按顺序处理。而不同 order_id 的消息可以被分发到不同队列,由不同消费者并行处理,从而在保证了“订单内有序”的同时,也保证了“订单间并行”的高吞吐量。

总结:如何保证消费顺序?

放弃全局有序,追求局部有序。 通过哈希取模等方式,将需要保证顺序的消息路由到同一个队列,并由单个消费者处理该队列。
本站由 又拍云官网提供CDN加速/云储存服务