本文梳理 RocketMQ 源码学习笔记,涵盖源码结构、NameServer 路由中心、消息发送机制等核心内容。
一、RocketMQ 源码及启动
1.1 下载及配置启动
RocketMQ 下载地址 | DashBoard 部署指南
本例为 4.9.x 版本的 Source。
启动步骤:
- 将 Source 导入到 IDEA,使用 JDK 1.8,加载 Maven。
- 启动 Namesrv:配置 IDEA 的环境变量
ROCKETMQ_HOME=''(可配置为项目根目录)。 - 启动 Broker。
- 启动 Producer。
- 启动 Consumer。
1.2 源码核心目录
| 目录 | 说明 |
|---|---|
acl | 权限控制模块 |
broker | Broker 模块(Broker 启动进程) |
client | 消息客户端,包含消息生产者和消息消费者相关类 |
common | 公共包 |
dev | 开发者信息(非源码) |
distribution | 打包分发目录(非源码) |
example | RocketMQ 示例代码 |
filter | 消息过滤相关基础类 |
logappender | 日志实现相关类 |
logging | 自主实现日志相关类 |
namesrv | NameServer 实现相关类(NameServer 启动进程) |
openmessaging | 消息开放标准,已发布 |
remoting | 远程通信模块,基于 Netty |
srvutil | 服务器工具类 |
store | 消息存储实现相关类 |
style | checkstyle 相关实现 |
test | 测试相关类 |
tools | 工具类,监控命令相关实现类 |
1.3 设计理念
NameServer 设计
NameServer 的设计极其简单,摒弃了业界常用的将 ZooKeeper 作为”注册中心”的方案,而是自研 NameServer 实现元数据管理(topic 路由信息等)。从实际需求出发,topic 路由信息无须在集群之间保持强一致,而是追求最终一致性,并且能容忍分钟级的不一致。
正是基于这种特性,RocketMQ 的 NameServer 集群之间互不通信,这样极大降低了 NameServer 实现的复杂度,对网络的要求也降低了不少,性能相比 ZooKeeper 还有了极大提升。
I/O 存储机制
RocketMQ 的消息存储文件被设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制,所有主题的消息存储按顺序编写,极大地提升了消息的写性能。同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
RocketMQ 只保证消息被消费者消费,在设计上允许消息被重复消费,即保证至少消费一次,消息重复问题由消费者在消息消费时实现幂等来解决。
二、RocketMQ 路由中心 NameServer
分布式系统中的服务注册中心主要提供服务调用的解析服务,服务调用者可以通过注册中心找到对应的服务提供者从而进行调用。RocketMQ 里也有这样一个注册中心,称之为 NameServer,本节介绍其路由管理、服务注册及服务发现机制。
2.1 NameServer 架构设计
RocketMQ 整体架构

- Producer:消息发布的角色,支持分布式集群方式部署。通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署,支持以 push 推、pull 拉两种模式消费消息,同时也支持集群方式和广播方式的消费,提供实时消息订阅机制。
- NameServer:一个非常简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,主要包括两个功能:
- Broker 管理:接受 Broker 集群的注册信息并保存,作为路由信息的基本数据;同时提供心跳检测机制——NameServer 与每台 Broker 保持长连接,每隔 10s 检测一次 Broker 是否存活,若发现宕机则从路由注册表中移除。
- 路由信息管理:每个 NameServer 保存关于 Broker 集群的整个路由信息和供客户端查询的队列信息,Producer 和 Consumer 通过 NameServer 即可知道整个 Broker 集群的路由信息从而进行消息投递和消费。NameServer 通常以集群方式部署,各实例间相互不通讯;Broker 向每一台 NameServer 注册自己的路由信息,所以每个 NameServer 实例上都保存一份完整的路由信息——当某个 NameServer 下线,Broker 仍可向其它 NameServer 同步路由信息,Producer/Consumer 仍能动态感知 Broker 的路由变化。
- BrokerServer:主要负责消息的存储、投递和查询以及服务高可用保证。
NameServer 本身的高可用性可通过部署多台服务器实现,但彼此之间互不通信。虽然某一时刻各 NameServer 数据并不完全相同,但对消息发送不会造成重大影响,无非是短暂造成消息发送不均衡——这正是 RocketMQ NameServer 设计的一个亮点。
NameServer 工作机制
RocketMQ 的工作机制基于 Topic 的发布订阅:Producer 将某一 Topic 的消息发送给 Broker 存储,Broker 根据订阅信息将消息推送给 Consumer,或 Consumer 主动向 Broker 拉取某个 Topic 下的消息。NameServer 主要就是为了感知 Broker 是否活跃。

工作原理:
- Broker 会向 NameServer 集群的每一台机器发送心跳包,包括 Topic 路由信息。
- 生产者、消费者每隔 30s 从 NameServer 获取一次 topic 信息。
- NameServer 收到 Broker 的心跳包时会记录时间戳。
- NameServer 每 10s 扫描一次
brokerLiveTable,将其最后更新的时间戳与当前扫描的时间戳对比,若超过 120s,则认为该 Broker 已无心跳,更新 topic 路由信息,将失效的 Broker 信息移除。
相关核心数据结构(RouteInfoManager):
1 | public class RouteInfoManager { |
| 字段 | 说明 |
|---|---|
topicQueueTable | 每个 topic 对应的队列路由信息,记录当前 Broker 负责管理的所有主题及对应队列信息(队列 ID、读写权限、Broker 地址等)。主题配置变化或初次注册时,Broker 会更新对应主题的队列数据 |
brokerAddrTable | Broker 基本信息,包括名称、所属集群、Broker ID、地址等 |
clusterAddrTable | 集群信息,每个 Broker 集群下用 Set 存储多个 Broker 的名称 |
brokerLiveTable | Broker 心跳信息,记录每个 Broker 地址对应的存活信息:最后更新时间戳、注册到 NameServer 时的 Channel 通道、Broker 的 HA 地址 |
filterServerTable | 用于过滤 Server |
RocketMQ 的一个 topic 拥有多个队列,一个 Broker 默认为每个 topic 创建 4 个读队列和 4 个写队列。多个 Broker 组成一个集群,BrokerName 相同的 Broker 组成主从架构:brokerId = 0 表示主节点,brokerId > 0 表示从节点。
2.2 NameServer 启动流程
NameServer 的启动类是 org.apache.rocketmq.namesrv.NamesrvStartup:
- 解析配置文件或命令参数,给
NamesrvConfig(业务参数)、NettyServerConfig(网络参数)这两个核心对象赋值。 - 根据这两个配置创建
NamesrvController对象,调用initialize方法初始化:- 定时任务每 10 秒扫描一次不活跃的 Broker。
- 每 10 分钟打印一次 KV 配置。
- 注册 JVM 钩子,在 JVM 进程关闭之前先关闭线程池、释放资源。
- 启动 Netty 网络服务器。
initialize 方法核心逻辑:
1 | public boolean initialize() { |
注册 JVM 钩子:
1 | Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { |
2.3 路由注册、故障剔除与发现
路由注册
RocketMQ 路由注册通过 Broker 与 NameServer 的心跳机制实现。Broker 启动时遍历所有 NameServer 并发送心跳语句完成注册,此后每隔 30s 向集群中所有 NameServer 发送心跳包。NameServer 收到心跳包时先更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp,再每隔 10s 扫描一次该缓存:若连续 120s 没收到心跳包,则移除该 Broker 的路由信息并关闭 Socket 连接。
Broker 发送心跳包:遍历每一个 NameServer 依次发送心跳包,底层基于 Netty 框架实现。
NameServer 处理心跳包:
路由注册:加写锁防止并发修改
RouteInfoManager中的路由表(允许多个消息发送者并发读操作):1
2
3
4
5
6
7
8
9this.lock.writeLock().lockInterruptibly();
// 判断Broker所属集群是否存在
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
// 不存在则创建集群, 将broker名称加入集群Broker集合
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);维护 BrokerData 信息:先从
brokerAddrTable中根据brokerName查信息,不存在则新建,存在则替换原信息。更新 topic 路由元数据:如果 Broker 为主节点,并且 topic 配置信息发生变化或是初次注册,则创建或更新 topic 路由元数据并填充
topicQueueTable(即为默认主题自动注册路由信息,包含MixAll.DEFAULT_TOPIC的路由信息)。当消息生产者发送主题时,如果该主题未创建,并且BrokerConfig的autoCreateTopicEnable为true(broker.properties中默认开启),则返回MixAll.DEFAULT_TOPIC的路由信息。更新 BrokerLiveInfo,存储状态正常的 Broker 信息表。
更新过滤器 Server 地址列表。
如果 Broker 是从节点,则从主节点拿到最新的
brokerLiveInfo并更新到对应的masterAddr属性。
RocketMQ 网络传输基于 Netty。
路由删除
当 Broker 挂了,无法继续向 NameServer 发送心跳包,NameServer 需要把挂掉的 Broker 从路由信息表中删除。NameServer 维护一个定时任务,每 10s 扫描一次 brokerLiveTable:若 lastUpdateTimestamp 与当前扫描时间戳的差值超过 120s,则认为该 Broker 已无心跳,将其失效信息移除。
此外,Broker 正常关闭时会主动执行
unregisterBroker指令。
路由发现
RocketMQ 的路由发现是非实时的:当 topic 路由发生变化后,NameServer 并不会主动推送给客户端,而是由客户端定时拉取 topic 的最新路由。
graph TD
A["NameServer"] --> C["每隔 10s 扫描 brokerLiveTable<br>超过 120s 未收到心跳<br>则移除该 Broker 的所有路由信息"]
A --> F["收到 Broker 心跳包后<br>更新 brokerLiveTable"]
D["Broker"] -->|每隔 30s 上报心跳| A
E["Producer"] -->|根据 topic 查询路由| A三、RocketMQ 消息发送
RocketMQ 发送普通消息有 3 种实现方式:可靠同步发送、可靠异步发送和单向发送。
| 方式 | 说明 |
|---|---|
| sync(同步) | 发送者调用发送消息 API 后同步等待,直到消息服务器返回发送结果 |
| async(异步) | 调用 API 时指定发送成功后的回调函数,调用后立即返回,发送线程不阻塞;成功/失败的回调任务在新线程中执行 |
| one way(单向) | 调用 API 时直接返回,不等待结果也不注册回调,不关心消息是否成功存储 |
3.1 消息发送
topic 路由机制
消息发送者初次发送时会根据 topic 名称向 NameServer 集群查询路由信息并存储在本地,此后每隔 30s 遍历缓存中的 topic,向 NameServer 查询最新路由信息并更新。RocketMQ 还提供了自动创建主题机制(autoCreateTopicEnable=true)。
sequenceDiagram
participant P as Producer
participant N as NameServer
participant B as Broker
B->>B: 创建系统 topic(允许自动创建则建默认主题路由)
B->>N: 每 30s 汇报路由配置
P->>N: 每 30s 拉取 topic 最新信息
P->>P: 发送消息,先查本地缓存路由表
alt 本地未查到
P->>N: 查找路由信息
alt NameServer 未查到
P->>N: 按默认 topic 查找路由
else 查到了
P->>B: 按条件改变队列数,选一个队列发送
end
end
N->>N: 更新路由信息
P->>B: 发送消息
B-->>P: 结束消息发送RocketMQ 中的路由消息是持久化在 Broker 中的,NameServer 中的路由信息来自 Broker 的心跳包并存储在内存中。
消息发送高可用设计
发送端在自动发现主题路由信息后,RocketMQ 默认使用轮询算法进行路由的负载均衡,也支持自定义队列负载算法。
使用自定义的路由负载算法后,RocketMQ 的重试机制将失效。
RocketMQ 为实现消息发送高可用,提供了两种机制:
- 消息发送重试机制:默认重试两次。
- 故障规避机制:当消息第一次发送失败时,如果下一次还是发送到刚刚失败的 Broker 上,大概率还会失败,因此重试时会尽量避开刚刚失败的 Broker,选择其他 Broker 上的队列发送,从而提高消息发送的成功率。
graph TD
A["生产者"] --> B["构建消息"]
B --> C["调用 send 方法"]
C --> D{"是否需要事务?"}
D -->|是| E["开启事务"]
D -->|否| F["普通消息发送"]
E --> G["执行本地事务"]
G --> H["提交/回滚事务"]
H --> I["发送半消息到 Broker"]
I --> J["等待 Broker 确认"]
J --> K["发送提交/回滚指令"]
K --> L["Broker 完成事务"]
F --> M["构造 Message 对象"]
M --> N["设置 Topic、Tag、Key 等"]
N --> O["选择 MessageQueue"]
O --> P["计算路由:按 Topic 和队列数选择"]
P --> Q["发送消息到 Broker"]
Q --> R["Broker 接收消息"]
R --> S["写入 CommitLog"]
S --> T["映射到 ConsumeQueue"]
T --> U["更新 Index 文件"]
U --> V["返回发送结果给生产者"]3.2 RocketMQ 消息结构
消息类位于 org.apache.rocketmq.common.message:
1 | private String topic; |
| 字段 | 说明 |
|---|---|
topic | 标识消息所属的”主题”,是消息分类的基础 |
flag | 消息标志位,用于标记一些二进制特性(如是否压缩、是否事务等) |
properties.tags | 消息 tag,用于消息过滤 |
properties.keys | 消息索引键,RocketMQ 可根据这些 key 快速检索消息 |
properties.waitStoreMsgOK | 消息发送时是否等消息存储完成后再返回 |
3.3 生产者启动流程
消息生产者的代码都在 client 模块中——对 RocketMQ 而言,它既是客户端,也是消息的提供者。
检查
producerGroup是否符合要求,将生产者的instanceName改为进程 ID。创建
MQClientInstance实例。整个 JVM 实例中只存在一个MQClientManager实例:1
2
3// 维护一个MQClientInstance实例
// clientId为客户端IP+instance+unitname(可选)
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>()如果
instance为默认值DEFAULT,RocketMQ 会自动将其设置为进程 ID,避免不同进程相互影响;但同一 JVM 中相同clientId的消费者和生产者在启动时获取的MQClientInstance实例都是同一个。MQClientInstance封装了 RocketMQ 的网络处理 API,是消息生产者、消费者与 NameServer、Broker 打交道的网络通道。向
MQClientInstance注册服务,将当前生产者加入其管理,方便后续调用网络请求、进行心跳检测等。启动
MQClientInstance。
3.4 消息发送基本流程
flowchart LR
subgraph 验证消息
A["消息长度验证"]
end
subgraph 查找路由
A ==>|发送前需先获取主题路由,才知道发往哪个 Broker| B["查找主题路由信息"]
C["Cache"] ==>|本地缓存获取| B
D["NameServer"] ==>|更新本地路由表| C
D -->|首次发送本地无缓存时,查询 NameServer 获取| B
end
subgraph 选择消息队列
B ==> E["选择消息队列"]
end
subgraph 发送
E ==> F["发送消息"]
end默认消息以同步方式发送,默认超时时间为 3s。
① 消息长度验证
规范要求:
- 主题名称、消息体不能为空。
- 消息长度不能等于 0,且默认不能超过允许发送消息的最大长度 4MB。
② 查找主题路由信息
如果生产者本地缓存了 topic 的路由信息且包含消息队列,则直接返回该路由信息;如果没有缓存或不包含消息队列,则向 NameServer 查询;如果最终未找到,则抛出”无法找到主题相关路由信息”异常。
生产环境不建议开启自动创建主题(autoCreateTopicEnable=true),原因是这会导致新创建的主题只存在于集群中的部分节点上:
- 启用自动创建主题时,消息生产者往 Broker 端发送消息时才会触发自动创建。
- Broker 端会在一个心跳包周期内,将新创建的路由信息发送到 NameServer,同时还有一个定时任务把内存中的路由信息持久化到磁盘。
- 消息发送者每隔 30s 向 NameServer 更新路由信息:如果发送端一段时间内未发消息,集群内的第二台 Broker 就不会被路由覆盖,NameServer 中新创建 Topic 的路由信息可能只包含 Broker-a;此后发送端拉取到的最新路由信息会把原本缓存的”2个broker”覆盖为”1个broker”,导致该 Topic 的消息永远不会发送到另一个 Broker。
简单来说:如果开启了自动创建主题,第一次发消息时走默认路由到 Broker-1 自动创建,但隔一段时间后,Producer 将 NameServer 的路由信息更新到本地缓存,此后就走不了默认路由,到不了 Broker-2 了。
③ 选择消息队列
选择消息队列有两种方式,由 sendLatencyFaultEnable 参数控制是否启用 Broker 故障延迟机制(默认 false 不启用):
NameServer 检测 Broker 是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次 NameServer 不是检测到宕机就立即推送给生产者,而是生产者每隔 30s 更新一次路由信息,因此最快感知 Broker 最新状态也需要 30s。这就需要引入一种机制:在 Broker 宕机期间,一次消息发送失败后,将该 Broker 暂时排除在消息队列的选择范围之外。
- 默认机制(
sendLatencyFaultEnable=false) - Broker 故障延迟机制(
sendLatencyFaultEnable=true)
④ 消息发送(详情)
该部分笔记暂未完成,后续可补充:消息发送的具体网络调用、CommitLog 写入流程、ConsumeQueue 与索引文件更新机制等。
待补充内容
本文目前覆盖到”消息发送(详情)”章节起始处,原笔记到此为止。后续可以继续整理的方向包括:
- 消息存储机制(CommitLog / ConsumeQueue / IndexFile)
- 消息消费流程(Push / Pull 模式、负载均衡、消费进度管理)
- 高可用与主从同步机制
- 事务消息实现原理
- 顺序消息与延迟消息



