设计一个电商「订单事件总线」:用户下单后,订单服务产生一条 order.created 事件,要被 5 类下游 各自独立消费——库存扣减、积分发放、推荐更新、数据仓库入仓、风控审计。这些下游处理速度天差地别:库存扣减 2ms,数仓批量入仓一条要 200ms。
硬约束:
created → paid → shipped 不能乱序,否则状态机错乱。今天讲:为什么这个场景必须用消息队列而非同步 RPC、Kafka / RabbitMQ / SQS 怎么选、投递语义的本质、backpressure 与消费积压、以及 DLQ 怎么兜底毒消息。
核心思路:一份事件、多个独立 Consumer Group,各自维护 offset,互不影响——这是 pub/sub fan-out。Producer 用 ack=all + 幂等保证不丢;按 order_id 分区保证单订单有序;慢消费者积压只影响自己;重试耗尽的消息进 DLQ 不阻塞主流。
核心 trade-off:留存可重放的日志 vs 灵活路由的智能 broker vs 零运维的托管队列。
原理:三者是三种不同的抽象。Kafka 是分布式 append-only log:消息写入 partition 后不因被消费而删除,consumer 只是移动自己的 offset,因此天然支持多订阅者 fan-out、回放历史、高吞吐顺序 IO。RabbitMQ 是智能 broker:消息经 exchange 按 routing key 投到 queue,消费 ack 后即删除,强在灵活路由(topic/fanout/header exchange)和单条消息粒度控制。SQS 是 AWS 全托管队列:无需运维,弹性近无限,但功能最简(标准队列无序、FIFO 队列有序但限流)。
| 维度 | Kafka | RabbitMQ | SQS |
|---|---|---|---|
| 抽象 | 持久化 log | broker + queue | 托管 queue |
| 消费后 | 保留(按 TTL) | ack 即删 | ack 即删 |
| 重放历史 | ✅ 重置 offset | ❌ | ❌ |
| 吞吐 | 极高(百万/s) | 中(万/s) | 高(自动弹) |
| 路由灵活性 | 弱(topic+key) | ✅ 强 | 弱 |
| 运维 | 重(自管集群) | 中 | 零 |
核心 trade-off:不丢 vs 不重 不可兼得(无幂等时);工程上的「恰好一次」= 至少一次 + 幂等。
原理:投递语义取决于 ack 与处理的先后。At-most-once:先 ack(提交 offset)再处理——崩溃则消息丢失,绝不重复,适合可丢的指标采样。At-least-once:先处理再 ack——崩溃在 ack 前则重投,绝不丢失但可能重复,是绝大多数业务的默认选择。Exactly-once:网络层面不可达(两将军问题),但可以通过至少一次投递 + 消费端幂等/去重在「效果上」实现每条恰好生效一次。
def consume(msg):
if dedup.exists(msg.event_id): # 去重表 / 唯一约束
commit_offset(msg); return # 重复投递, 跳过
process(msg) # 业务: 扣库存 / 加积分
dedup.insert(msg.event_id)
commit_offset(msg) # 先处理后提交 = at-least-once
# 崩在 process 后 / commit 前 → 重投, 靠 dedup 兜住
核心 trade-off:buffer(吸收突发) vs drop / 限流(保护系统)——队列把同步背压变成了可观测的 lag。
原理:生产速率 > 消费速率时,消息堆积。同步 RPC 里这表现为调用方阻塞、线程耗尽、级联雪崩;消息队列把它转化为 consumer lag(offset 落后量),是一个可监控、可缓冲的指标。应对手段分两类:扩容消费(加 consumer,但受 partition 数上限——一个 partition 同组内只能被一个 consumer 消费);限制生产(producer 限流 / 拒绝,把压力推回源头)。Kafka 的 log 模型让积压「躺在磁盘上」相对安全;RabbitMQ 队列积压在内存/磁盘则可能触发 flow control 阻塞 producer。
while True:
batch = consumer.poll(max_records=500, timeout=100ms)
lag = end_offset(partition) - current_offset
if lag > HIGH_WATERMARK:
scale_out_signal() # 触发 autoscaler 加 consumer
process_batch(batch) # 批处理摊薄固定开销
consumer.commit() # 批量提交一次 offset
consumer lag(如 Burrow / Cruise Control 监控)作为扩缩容信号,是流处理运维的核心 SLI。核心 trade-off:无限重试(阻塞队列) vs 丢弃(丢数据) vs DLQ(隔离 + 留证据)。
原理:某条消息因数据畸形、下游 bug、依赖永久不可用而反复处理失败,称为毒消息(poison message)。若无限重试,它会卡住整个 partition / queue(尤其有序队列,后面的消息全被堵);若直接丢,则丢数据无追溯。DLQ 的做法:重试 N 次仍失败,把消息连同失败上下文移到一个专门的死信队列,主流继续前进;之后人工或自动分析 DLQ,修复后 redrive 回主队列重放。
def handle(msg):
try:
process(msg)
except NonRetryable as e: # 畸形数据, 重试无意义
to_dlq(msg, reason=e); return
except Retryable as e:
if msg.attempts >= MAX_RETRY:
to_dlq(msg, reason=e, attempts=msg.attempts)
else:
requeue(msg, delay=backoff(msg.attempts))
maxReceiveCount 自动移入 DLQ;FIFO 队列里毒消息会阻塞整个 message group 直到它被处理或移入 DLQ。order_id 分区——既有序又能横向扩展。这是「分区键即并行单元」的核心权衡。面试可能追问:
ack=0/1/all 分别什么语义?什么时候会丢消息?结果:50 个 consumer 完全空闲。因为 Kafka 的并行单元是 partition——同一个 consumer group 内,一个 partition 在任一时刻只能被一个 consumer 消费(保证组内不重复)。100 个 partition 最多被 100 个 consumer 瓜分,多出的 50 个抢不到任何 partition,纯属浪费。
推论:消费并行度的上限 = partition 数。想扩到 150 并行,必须先把 partition 加到 ≥150。但加 partition 有代价:(1) 基于 key 的 hash 路由会变(hash(key) % N 中 N 变了),同一 key 的历史消息和新消息可能落到不同 partition,破坏有序性;(2) partition 越多,broker 元数据、文件句柄、rebalance 时间都上升。
所以建 topic 时就要按未来峰值规划 partition 数,这是少数「事前决策远比事后补救便宜」的参数。这也解释了为什么 Uber 要做 uForwarder——用 push proxy 解耦「消费并行度」与「partition 数」。
先算量级:1 亿条 / 3600s ≈ 要 2.8 万/s 的净追赶速率(还得高于实时进来的速率)。单 consumer 200ms/条只有 5 条/s,差了 4 个数量级,必须并行 + 批处理。
更深一层:能 1 小时追上的前提是 partition 数和下游写入能力早就预留了余量。如果 partition 只有 10 个,任你怎么加 consumer 也只有 10 并行——积压恢复能力是设计期决定的,不是故障期能临时变出来的。这正是「log 模型让积压安全躺在磁盘上」的价值:它给了你恢复的时间窗口,但恢复速度仍受架构上限约束。
消费端幂等解决的是「同一条消息重复处理」。但有一类问题它解决不了:「读-处理-写」的原子性——consumer 从 topic A 读、处理后写到 topic B,然后提交 A 的 offset。这三步若非原子,崩溃点不同会出问题:写了 B 但没提交 A 的 offset → 重启后重新处理、B 里出现重复;提交了 offset 但没写 B → B 里丢数据。
Kafka 事务把「写 B + 提交 A 的 offset」包在一个原子事务里,配合幂等 producer(按序列号去重),实现流处理的 EOS:要么都成功,要么都回滚。这是消费端幂等做不到的,因为 offset 提交和外部写本就是两个系统的动作。
但关键边界:这只在「Kafka → Kafka」闭环成立。一旦你写的是外部 Postgres 或调 Stripe,Kafka 事务管不到那边——又回到 at-least-once + 业务幂等。所以 EOS 不是银弹,它精确地解决「Kafka 内流处理」这一类问题,理解它的适用边界比记住它存在更重要。
这是个反直觉点:加一个组件不是增加了故障面吗?答案在于把什么样的故障换成什么样的故障。
同步 RPC 的问题:订单服务要同步调用 5 个下游。(1) 时间耦合:任一下游慢/挂,下单就慢/失败——5 个下游的可用性相乘,整体可用性暴跌;(2) 级联雪崩:数仓慢 → 订单服务线程阻塞堆积 → 订单服务自己挂 → 拖垮上游;(3) 扇出耦合:加第 6 个下游要改订单服务代码。
消息队列的转换:订单服务只需把事件可靠写入队列(一次本地 + 一次队列写,配 Outbox 可做到原子),就立即返回。下游各自异步消费——下游挂了不影响下单,恢复后从 offset 续上;加下游只需新增 consumer group,订单服务零改动;慢消费者只积压自己。
代价是:(1) 最终一致——下游状态有延迟,UI 要兜底(「处理中」);(2) at-least-once 的重复——要幂等;(3) 多了个要运维的中间件。本质是用「最终一致 + 幂等复杂度」换「时间解耦 + 抗级联 + 易扩展」。对订单这种「下单必须快且稳、下游可异步」的场景,这笔交易划算;对「必须同步拿到结果」的场景(如实时风控拒绝),则该留同步调用。