# RocketMQ
# 参考文档
# 安装
# Server
上面下载下来后 NameServer 和 Broker 是在一起的。下载解压后,放在 /opt
包下。
# 启动 NameServer
启动 NameServer 之前先配置下内存:
# 修改 namesrv 的 JVM 参数配置,修改 `bin/runserver.sh` 文件:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2
启动:
cd /opt/rocketmq-all-5.1.0-bin-release/bin
# 启动 NameServer
# mqnamesrv 实际执行的是 runserver.sh
./mqnamesrv
# 后台启动
nohup ./mqnamesrv > nameserver.log &
### 验证 namesrv 是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
# 输出:The Name Server boot success...
2
3
4
5
6
7
8
9
10
# 启动 Broker 和 Proxy
启动 Broker 之前先配置下内存:
# 修改 broker 的 JVM 参数配置,将默认 8G 内存修改为 512M。修改 `bin/runbroker.sh` 文件:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
#在 conf/broker.conf 文件中加入如下配置,开启自动创建 Topic 功能
autoCreateTopicEnable = true
2
3
4
启动 Broker 和 Proxy:
cd /opt/rocketmq-all-5.1.0-bin-release/bin
# 启动 Broker 和 Proxy
# mqbroker 实际执行的是 runbroker.sh
./mqbroker -n localhost:9876 --enable-proxy
# 后台启动
nohup ./mqbroker -n localhost:9876 --enable-proxy > broker.log &
### 验证 broker 是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log
# 输出:The broker[broker-a,192.169.1.2:10911] boot success...
2
3
4
5
6
7
8
9
10
# 工具测试消息收发
在进行工具测试消息收发之前,我们需要告诉客户端 NameServer 的地址,RocketMQ 有多种方式在客户端中设置 NameServer 地址,这里利用环境变量 NAMESRV_ADDR
:
cd /opt/rocketmq-all-5.1.0-bin-release/bin
export NAMESRV_ADDR=localhost:9876
# 使用 tools.sh 工具验证消息的发送,默认会发 1000 条消息
sh ./tools.sh org.apache.rocketmq.example.quickstart.Producer
# 使用 tools.sh 工具验证消息的接收
sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
2
3
4
5
6
7
# 关闭服务器
cd /opt/rocketmq-all-5.1.0-bin-release/bin
# 关闭 Broker
./mqshutdown broker
# 关闭 NameServer
./mqshutdown namesrv
2
3
4
5
# RocketMQ Dashboard
RocketMQ Dashboard (opens new window) 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以 可视化工具 代替 Topic 配置、Broker 管理等命令行操作。
# docker 方式安装
# 安装docker,拉取 rocketmq-dashboard 镜像
docker pull apacherocketmq/rocketmq-dashboard
# docker 容器中运行 rocketmq-dashboard
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard
2
3
4
安装成功后,在浏览器页面访问 http://<namesrv.addr:8080>
# Dashboard 简介
// TODO
# RocketMQ 架构
Apache RocketMQ 架构上主要分为四部分:
- Producer,生产者
- Consumer,消费者
- NameServer,名字服务器
- Broker,代理服务器
# RocketMQ 集群工作流程
启动 NameServer
NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
启动 Broker
启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
创建 Topic
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
生产者发送消息
启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
消费者接受消息
跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。
# Java 示例
引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
2
3
4
5
Producer 代码:
public static void main(String[] args) throws Exception {
// 实例化一个生产者组
DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
// 指定名字服务器地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动实例
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("MyTopic", "MyTag", ("This is my message - " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
System.out.println("第 " + i + " 个发送结果 - " + sendResult);
}
// 关闭实例
producer.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Consumer 代码:
public static void main(String[] args) throws Exception {
// 实例化一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup1");
// 指定名字服务器地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅一个或多个 Topic 来消费
consumer.subscribe("MyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(new String(messageExts.get(0).getBody(), StandardCharsets.UTF_8));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
// 关闭实例
// consumer.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Spring Boot 整合
# 讨论区
由于评论过多会影响页面最下方的导航,故将评论区做默认折叠处理。