RocketMQ 技术内幕
Published in:2026-01-15 | category: 中间件

本文梳理 RocketMQ 源码学习笔记,涵盖源码结构、NameServer 路由中心、消息发送机制等核心内容。

一、RocketMQ 源码及启动

1.1 下载及配置启动

RocketMQ 下载地址 | DashBoard 部署指南

本例为 4.9.x 版本的 Source。

启动步骤:

  1. 将 Source 导入到 IDEA,使用 JDK 1.8,加载 Maven。
  2. 启动 Namesrv:配置 IDEA 的环境变量 ROCKETMQ_HOME=''(可配置为项目根目录)。
  3. 启动 Broker。
  4. 启动 Producer。
  5. 启动 Consumer。

1.2 源码核心目录

目录说明
acl权限控制模块
brokerBroker 模块(Broker 启动进程)
client消息客户端,包含消息生产者和消息消费者相关类
common公共包
dev开发者信息(非源码)
distribution打包分发目录(非源码)
exampleRocketMQ 示例代码
filter消息过滤相关基础类
logappender日志实现相关类
logging自主实现日志相关类
namesrvNameServer 实现相关类(NameServer 启动进程)
openmessaging消息开放标准,已发布
remoting远程通信模块,基于 Netty
srvutil服务器工具类
store消息存储实现相关类
stylecheckstyle 相关实现
test测试相关类
tools工具类,监控命令相关实现类

1.3 设计理念

NameServer 设计

NameServer 的设计极其简单,摒弃了业界常用的将 ZooKeeper 作为”注册中心”的方案,而是自研 NameServer 实现元数据管理(topic 路由信息等)。从实际需求出发,topic 路由信息无须在集群之间保持强一致,而是追求最终一致性,并且能容忍分钟级的不一致。

正是基于这种特性,RocketMQ 的 NameServer 集群之间互不通信,这样极大降低了 NameServer 实现的复杂度,对网络的要求也降低了不少,性能相比 ZooKeeper 还有了极大提升。

I/O 存储机制

RocketMQ 的消息存储文件被设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制,所有主题的消息存储按顺序编写,极大地提升了消息的写性能。同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件

RocketMQ 只保证消息被消费者消费,在设计上允许消息被重复消费,即保证至少消费一次,消息重复问题由消费者在消息消费时实现幂等来解决。


二、RocketMQ 路由中心 NameServer

分布式系统中的服务注册中心主要提供服务调用的解析服务,服务调用者可以通过注册中心找到对应的服务提供者从而进行调用。RocketMQ 里也有这样一个注册中心,称之为 NameServer,本节介绍其路由管理、服务注册及服务发现机制。

2.1 NameServer 架构设计

RocketMQ 整体架构

RocketMQ整体架构图

  • Producer:消息发布的角色,支持分布式集群方式部署。通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署,支持以 push 推、pull 拉两种模式消费消息,同时也支持集群方式和广播方式的消费,提供实时消息订阅机制。
  • NameServer:一个非常简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,主要包括两个功能:
    • Broker 管理:接受 Broker 集群的注册信息并保存,作为路由信息的基本数据;同时提供心跳检测机制——NameServer 与每台 Broker 保持长连接,每隔 10s 检测一次 Broker 是否存活,若发现宕机则从路由注册表中移除。
    • 路由信息管理:每个 NameServer 保存关于 Broker 集群的整个路由信息和供客户端查询的队列信息,Producer 和 Consumer 通过 NameServer 即可知道整个 Broker 集群的路由信息从而进行消息投递和消费。NameServer 通常以集群方式部署,各实例间相互不通讯;Broker 向每一台 NameServer 注册自己的路由信息,所以每个 NameServer 实例上都保存一份完整的路由信息——当某个 NameServer 下线,Broker 仍可向其它 NameServer 同步路由信息,Producer/Consumer 仍能动态感知 Broker 的路由变化。
  • BrokerServer:主要负责消息的存储、投递和查询以及服务高可用保证。

NameServer 本身的高可用性可通过部署多台服务器实现,但彼此之间互不通信。虽然某一时刻各 NameServer 数据并不完全相同,但对消息发送不会造成重大影响,无非是短暂造成消息发送不均衡——这正是 RocketMQ NameServer 设计的一个亮点。

NameServer 工作机制

RocketMQ 的工作机制基于 Topic 的发布订阅:Producer 将某一 Topic 的消息发送给 Broker 存储,Broker 根据订阅信息将消息推送给 Consumer,或 Consumer 主动向 Broker 拉取某个 Topic 下的消息。NameServer 主要就是为了感知 Broker 是否活跃。

NameServer工作原理图

工作原理:

  • Broker 会向 NameServer 集群的每一台机器发送心跳包,包括 Topic 路由信息。
  • 生产者、消费者每隔 30s 从 NameServer 获取一次 topic 信息。
  • NameServer 收到 Broker 的心跳包时会记录时间戳。
  • NameServer 每 10s 扫描一次 brokerLiveTable,将其最后更新的时间戳与当前扫描的时间戳对比,若超过 120s,则认为该 Broker 已无心跳,更新 topic 路由信息,将失效的 Broker 信息移除。

相关核心数据结构(RouteInfoManager):

1
2
3
4
5
6
7
public class RouteInfoManager {
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
字段说明
topicQueueTable每个 topic 对应的队列路由信息,记录当前 Broker 负责管理的所有主题及对应队列信息(队列 ID、读写权限、Broker 地址等)。主题配置变化或初次注册时,Broker 会更新对应主题的队列数据
brokerAddrTableBroker 基本信息,包括名称、所属集群、Broker ID、地址等
clusterAddrTable集群信息,每个 Broker 集群下用 Set 存储多个 Broker 的名称
brokerLiveTableBroker 心跳信息,记录每个 Broker 地址对应的存活信息:最后更新时间戳、注册到 NameServer 时的 Channel 通道、Broker 的 HA 地址
filterServerTable用于过滤 Server

RocketMQ 的一个 topic 拥有多个队列,一个 Broker 默认为每个 topic 创建 4 个读队列和 4 个写队列。多个 Broker 组成一个集群,BrokerName 相同的 Broker 组成主从架构:brokerId = 0 表示主节点,brokerId > 0 表示从节点。

2.2 NameServer 启动流程

NameServer 的启动类是 org.apache.rocketmq.namesrv.NamesrvStartup

  1. 解析配置文件或命令参数,给 NamesrvConfig(业务参数)、NettyServerConfig(网络参数)这两个核心对象赋值。
  2. 根据这两个配置创建 NamesrvController 对象,调用 initialize 方法初始化:
    1. 定时任务每 10 秒扫描一次不活跃的 Broker。
    2. 每 10 分钟打印一次 KV 配置。
  3. 注册 JVM 钩子,在 JVM 进程关闭之前先关闭线程池、释放资源。
  4. 启动 Netty 网络服务器。

initialize 方法核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean initialize() {
// 加载 KV 配置管理器中的配置(如集群配置、Topic 配置等键值对数据)
this.kvConfigManager.load();
// 创建 Netty 网络通信服务端,用于处理 Broker 和 Client 的连接请求
// 参数:Netty 服务端配置 + Broker 连接保活服务(用于清理不活跃 Broker)
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建固定线程池,用于处理网络请求的业务逻辑(如路由注册、查询等)
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

this.registerProcessor();
// 启动定时任务:每 10 秒扫描一次不活跃的 Broker 节点并清理
// 初始延迟 5 秒,之后每 10 秒执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 每 10 分钟打印一次当前所有 KV 配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 如果 TLS/SSL 加密通信未被禁用,则启动证书文件监听服务
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// ...
}
return true;
}

注册 JVM 钩子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

// NamesrvController 的 shutdown 方法
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();

if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}

2.3 路由注册、故障剔除与发现

路由注册

RocketMQ 路由注册通过 Broker 与 NameServer 的心跳机制实现。Broker 启动时遍历所有 NameServer 并发送心跳语句完成注册,此后每隔 30s 向集群中所有 NameServer 发送心跳包。NameServer 收到心跳包时先更新 brokerLiveTable 缓存中 BrokerLiveInfolastUpdateTimestamp,再每隔 10s 扫描一次该缓存:若连续 120s 没收到心跳包,则移除该 Broker 的路由信息并关闭 Socket 连接。

  1. Broker 发送心跳包:遍历每一个 NameServer 依次发送心跳包,底层基于 Netty 框架实现。

  2. NameServer 处理心跳包

    1. 路由注册:加写锁防止并发修改 RouteInfoManager 中的路由表(允许多个消息发送者并发读操作):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      this.lock.writeLock().lockInterruptibly();
      // 判断Broker所属集群是否存在
      Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
      if (null == brokerNames) {
      // 不存在则创建集群, 将broker名称加入集群Broker集合
      brokerNames = new HashSet<String>();
      this.clusterAddrTable.put(clusterName, brokerNames);
      }
      brokerNames.add(brokerName);
    2. 维护 BrokerData 信息:先从 brokerAddrTable 中根据 brokerName 查信息,不存在则新建,存在则替换原信息。

    3. 更新 topic 路由元数据:如果 Broker 为主节点,并且 topic 配置信息发生变化或是初次注册,则创建或更新 topic 路由元数据并填充 topicQueueTable(即为默认主题自动注册路由信息,包含 MixAll.DEFAULT_TOPIC 的路由信息)。当消息生产者发送主题时,如果该主题未创建,并且 BrokerConfigautoCreateTopicEnabletruebroker.properties 中默认开启),则返回 MixAll.DEFAULT_TOPIC 的路由信息。

    4. 更新 BrokerLiveInfo,存储状态正常的 Broker 信息表。

    5. 更新过滤器 Server 地址列表

    6. 如果 Broker 是从节点,则从主节点拿到最新的 brokerLiveInfo 并更新到对应的 masterAddr 属性。

RocketMQ 网络传输基于 Netty。

路由删除

当 Broker 挂了,无法继续向 NameServer 发送心跳包,NameServer 需要把挂掉的 Broker 从路由信息表中删除。NameServer 维护一个定时任务,每 10s 扫描一次 brokerLiveTable:若 lastUpdateTimestamp 与当前扫描时间戳的差值超过 120s,则认为该 Broker 已无心跳,将其失效信息移除。

此外,Broker 正常关闭时会主动执行 unregisterBroker 指令。

路由发现

RocketMQ 的路由发现是非实时的:当 topic 路由发生变化后,NameServer 并不会主动推送给客户端,而是由客户端定时拉取 topic 的最新路由。

graph TD
    A["NameServer"] --> C["每隔 10s 扫描 brokerLiveTable<br>超过 120s 未收到心跳<br>则移除该 Broker 的所有路由信息"]
    A --> F["收到 Broker 心跳包后<br>更新 brokerLiveTable"]
    D["Broker"] -->|每隔 30s 上报心跳| A
    E["Producer"] -->|根据 topic 查询路由| A

三、RocketMQ 消息发送

RocketMQ 发送普通消息有 3 种实现方式:可靠同步发送、可靠异步发送和单向发送。

方式说明
sync(同步)发送者调用发送消息 API 后同步等待,直到消息服务器返回发送结果
async(异步)调用 API 时指定发送成功后的回调函数,调用后立即返回,发送线程不阻塞;成功/失败的回调任务在新线程中执行
one way(单向)调用 API 时直接返回,不等待结果也不注册回调,不关心消息是否成功存储

3.1 消息发送

topic 路由机制

消息发送者初次发送时会根据 topic 名称向 NameServer 集群查询路由信息并存储在本地,此后每隔 30s 遍历缓存中的 topic,向 NameServer 查询最新路由信息并更新。RocketMQ 还提供了自动创建主题机制(autoCreateTopicEnable=true)。

sequenceDiagram
    participant P as Producer
    participant N as NameServer
    participant B as Broker
    B->>B: 创建系统 topic(允许自动创建则建默认主题路由)
    B->>N: 每 30s 汇报路由配置
    P->>N: 每 30s 拉取 topic 最新信息
    P->>P: 发送消息,先查本地缓存路由表
    alt 本地未查到
        P->>N: 查找路由信息
        alt NameServer 未查到
            P->>N: 按默认 topic 查找路由
        else 查到了
            P->>B: 按条件改变队列数,选一个队列发送
        end
    end
    N->>N: 更新路由信息
    P->>B: 发送消息
    B-->>P: 结束消息发送

RocketMQ 中的路由消息是持久化在 Broker 中的,NameServer 中的路由信息来自 Broker 的心跳包并存储在内存中。

消息发送高可用设计

发送端在自动发现主题路由信息后,RocketMQ 默认使用轮询算法进行路由的负载均衡,也支持自定义队列负载算法。

使用自定义的路由负载算法后,RocketMQ 的重试机制将失效

RocketMQ 为实现消息发送高可用,提供了两种机制:

  1. 消息发送重试机制:默认重试两次。
  2. 故障规避机制:当消息第一次发送失败时,如果下一次还是发送到刚刚失败的 Broker 上,大概率还会失败,因此重试时会尽量避开刚刚失败的 Broker,选择其他 Broker 上的队列发送,从而提高消息发送的成功率。
graph TD
    A["生产者"] --> B["构建消息"]
    B --> C["调用 send 方法"]
    C --> D{"是否需要事务?"}
    D -->|是| E["开启事务"]
    D -->|否| F["普通消息发送"]

    E --> G["执行本地事务"]
    G --> H["提交/回滚事务"]
    H --> I["发送半消息到 Broker"]
    I --> J["等待 Broker 确认"]
    J --> K["发送提交/回滚指令"]
    K --> L["Broker 完成事务"]

    F --> M["构造 Message 对象"]
    M --> N["设置 Topic、Tag、Key 等"]
    N --> O["选择 MessageQueue"]
    O --> P["计算路由:按 Topic 和队列数选择"]
    P --> Q["发送消息到 Broker"]
    Q --> R["Broker 接收消息"]
    R --> S["写入 CommitLog"]
    S --> T["映射到 ConsumeQueue"]
    T --> U["更新 Index 文件"]
    U --> V["返回发送结果给生产者"]

3.2 RocketMQ 消息结构

消息类位于 org.apache.rocketmq.common.message

1
2
3
4
5
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
字段说明
topic标识消息所属的”主题”,是消息分类的基础
flag消息标志位,用于标记一些二进制特性(如是否压缩、是否事务等)
properties.tags消息 tag,用于消息过滤
properties.keys消息索引键,RocketMQ 可根据这些 key 快速检索消息
properties.waitStoreMsgOK消息发送时是否等消息存储完成后再返回

3.3 生产者启动流程

消息生产者的代码都在 client 模块中——对 RocketMQ 而言,它既是客户端,也是消息的提供者。

  1. 检查 producerGroup 是否符合要求,将生产者的 instanceName 改为进程 ID。

  2. 创建 MQClientInstance 实例。整个 JVM 实例中只存在一个 MQClientManager 实例:

    1
    2
    3
    // 维护一个MQClientInstance实例
    // clientId为客户端IP+instance+unitname(可选)
    ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>()

    如果 instance 为默认值 DEFAULT,RocketMQ 会自动将其设置为进程 ID,避免不同进程相互影响;但同一 JVM 中相同 clientId 的消费者和生产者在启动时获取的 MQClientInstance 实例都是同一个。MQClientInstance 封装了 RocketMQ 的网络处理 API,是消息生产者、消费者与 NameServer、Broker 打交道的网络通道。

  3. MQClientInstance 注册服务,将当前生产者加入其管理,方便后续调用网络请求、进行心跳检测等。

  4. 启动 MQClientInstance

3.4 消息发送基本流程

flowchart LR
    subgraph 验证消息
        A["消息长度验证"]
    end

    subgraph 查找路由
        A ==>|发送前需先获取主题路由,才知道发往哪个 Broker| B["查找主题路由信息"]
        C["Cache"] ==>|本地缓存获取| B
        D["NameServer"] ==>|更新本地路由表| C
        D -->|首次发送本地无缓存时,查询 NameServer 获取| B
    end

    subgraph 选择消息队列
        B ==> E["选择消息队列"]
    end

    subgraph 发送
        E ==> F["发送消息"]
    end

默认消息以同步方式发送,默认超时时间为 3s

① 消息长度验证

规范要求:

  • 主题名称、消息体不能为空。
  • 消息长度不能等于 0,且默认不能超过允许发送消息的最大长度 4MB

② 查找主题路由信息

如果生产者本地缓存了 topic 的路由信息且包含消息队列,则直接返回该路由信息;如果没有缓存或不包含消息队列,则向 NameServer 查询;如果最终未找到,则抛出”无法找到主题相关路由信息”异常。

生产环境不建议开启自动创建主题autoCreateTopicEnable=true),原因是这会导致新创建的主题只存在于集群中的部分节点上:

  1. 启用自动创建主题时,消息生产者往 Broker 端发送消息时才会触发自动创建。
  2. Broker 端会在一个心跳包周期内,将新创建的路由信息发送到 NameServer,同时还有一个定时任务把内存中的路由信息持久化到磁盘。
  3. 消息发送者每隔 30s 向 NameServer 更新路由信息:如果发送端一段时间内未发消息,集群内的第二台 Broker 就不会被路由覆盖,NameServer 中新创建 Topic 的路由信息可能只包含 Broker-a;此后发送端拉取到的最新路由信息会把原本缓存的”2个broker”覆盖为”1个broker”,导致该 Topic 的消息永远不会发送到另一个 Broker。

简单来说:如果开启了自动创建主题,第一次发消息时走默认路由到 Broker-1 自动创建,但隔一段时间后,Producer 将 NameServer 的路由信息更新到本地缓存,此后就走不了默认路由,到不了 Broker-2 了。

③ 选择消息队列

选择消息队列有两种方式,由 sendLatencyFaultEnable 参数控制是否启用 Broker 故障延迟机制(默认 false 不启用):

NameServer 检测 Broker 是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次 NameServer 不是检测到宕机就立即推送给生产者,而是生产者每隔 30s 更新一次路由信息,因此最快感知 Broker 最新状态也需要 30s。这就需要引入一种机制:在 Broker 宕机期间,一次消息发送失败后,将该 Broker 暂时排除在消息队列的选择范围之外。

  1. 默认机制sendLatencyFaultEnable=false
  2. Broker 故障延迟机制sendLatencyFaultEnable=true

④ 消息发送(详情)

该部分笔记暂未完成,后续可补充:消息发送的具体网络调用、CommitLog 写入流程、ConsumeQueue 与索引文件更新机制等。


待补充内容

本文目前覆盖到”消息发送(详情)”章节起始处,原笔记到此为止。后续可以继续整理的方向包括:

  • 消息存储机制(CommitLog / ConsumeQueue / IndexFile)
  • 消息消费流程(Push / Pull 模式、负载均衡、消费进度管理)
  • 高可用与主从同步机制
  • 事务消息实现原理
  • 顺序消息与延迟消息
Prev:
华为管理法学习笔记(下):干部管理、文化与激励
Next:
《格鲁夫给经理人的第一课》读书笔记(二):会议与决策