一、RabbitMQ概述
(1)什么是消息队列
| 特性对比 | AMQP | JMS |
|---|---|---|
| 七层网络模型 | 传输层和会话层 | 应用层 |
| 支持的编程语言和平台 | 支持多种编程语言和平台,包括Java、C++、Python等 | 针对Java平台,在其他编程语言和平台上支持相对较少 |
| 传输协议 | 二进制协议进行消息传递,提供了高效、可靠的消息投递机制 | 使用面向文本的协议(如HTTP/TCP),消息传递效率可能较低 |
| 消息模型 | 支持多种消息模型,包括点对点(P2P)和发布订阅(Pub/Sub) | 主要支持点对点(P2P)和发布订阅(Pub/Sub)模型 |
| 可靠性 | 提供了强大的可靠性保证,包括消息持久化、事务性消息和消息确认机制 | 支持消息持久化和事务性消息,但具体实现取决于消息传递系统的提供者 |
| 扩展性和兼容性 | 具有很好的扩展性和兼容性,可以在不同的消息代理之间交互操作 | 在Java环境中有较好的扩展性和兼容性,但在非Java环境集成时受到限制 |
(2)主流MQ产品对比
| 特性对比 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 研发团队 | Rabbit(公司性质) | Apache(社区) | 阿里(公司) | Apache(社区) |
| 开发语言 | Erlang | Scala & Java | Java | Java |
| 核心机制 | 基于AMQP的消息队列模型,生产者-消费者模式 | 分布式流平台,发布/订阅模型,高吞吐量 | 分布式消息队列,支持topic、tag消息分类过滤 | 基于JMS的消息传递模型,支持点对点和发布/订阅 |
| 协议支持 | XMPP、STOMP、SMTP | 自定义协议(社区封装HTTP支持) | 自定义协议 | XMPP、STOMP、OpenWire、REST |
| 客户端支持语言 | 官方:Erlang、Java、Ruby;社区几乎支持所有语言 | 官方:Java;社区:PHP、Python、Go等 | Java、C++ | Java、C、C++、Python、PHP、Perl、.NET等 |
| 可用性机制 | 镜像队列、仲裁队列 | 分区、副本 | 主从复制 | 主从复制 |
| 单机吞吐量 | 10w/s | 100w/s | 10w+/s(双十一案例支撑) | 10w-/s |
| 消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒级 |
| 消息确认 | 完整的消息确认机制 | N/A | 内置消息表,支持消息持久化到数据库 | N/A |
| 功能特性 | 并发能力好、性能高、延迟低、社区活跃、管理界面丰富 | 专为大数据场景设计,功能相对集中 | MQ功能完备、扩展性好、适合企业级复杂场景 | 老牌产品、成熟度高、文档丰富 |
(3)什么是RabbitMQ
RabbitMQ 是⼀个采⽤AMQP协议的开源消息代理软件,主要⽤于实现分布式系统间的可靠异步 通信、应⽤解耦和流量削峰。 RabbitMQ和Kafka的核⼼区别在于设计⽬标、架构模型和适⽤场景:RabbitMQ专注于可靠、低 延迟的消息传递,适合实时任务和复杂路由;⽽Kafka专为⾼吞吐量、持久化的流数据处理设计, 适⽤于⼤数据⽇志、事件溯源和实时分析场景。 环境准备 操作系统:确保所有节点运⾏相同版本的操作系统(Linux 是最常⻅的选择)。 主机名:为每个节点配置唯⼀的主机名,确保可以通过 DNS 或 /etc/hosts ⽂件解析到 IP 地址。 时间同步:使⽤ ntp 或 chrony 服务同步时间。 Erlang:RabbitMQ 依赖 Erlang。确保所有节点安装相同版本的 Er RabbitMQ 使⽤主机名和 Erlang 的 nodename 区分节点,因此每个节点的主机名必须唯⼀。
二、环境准备及主机名设置
RabbitMQ 使⽤主机名和 Erlang 的 nodename 区分节点,因此每个节点的主机名必须唯⼀。
| 系统 | ip | 主机名 |
| Ubuntu22.04 | 10.0.0.94 | elk94 |
| Ubuntu22.04 | 10.0.0.95 | elk95 |
| Ubuntu22.04 | 10.0.0.96 | elk96 |
在每台机器的 /etc/hosts 中添加集群中所有节点的主机名与 IP 对应关系 root@elk94:~# cat /etc/hosts 10.0.0.94 elk94 10.0.0.95 elk95 10.0.0.96 elk96
三、安装 Erlang 和 RabbitMQ
版本对应关系https://www.rabbitmq.com/docs/which-erlang
在每台机器上执⾏
先进⾏软件包索引更新,防⽌有依赖问题
apt-get update
安装
sudo apt-get install -y erlang
sudo apt-get install -y rabbitmq-server
启动并查看状态
安装完成后,RabbitMQ 会被⾃动启动并开机⾃启动
root@elk95:~# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enable
d; vendor preset: enabled)
Active: active (running) since Sun 2025-11-09 21:31:06 CST; 39s ago
Main PID: 43768 (beam.smp)
Tasks: 24 (limit: 4519)
Memory: 94.7M
CPU: 3.341s
CGroup: /system.slice/rabbitmq-server.service
├─43768 /usr/lib/erlang/erts-12.2.1/bin/beam.smp -W w -MBas a
geffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -sbwt none -sbwtdcpu none -sbwtdio none -- -
root /u>
├─43780 erl_child_setup 65536
├─43831 inet_gethost 4
├─43832 inet_gethost 4
└─43835 /bin/sh -s rabbit_disk_monitor
四、启⽤管理插件并创建管理员账号
RabbitMQ 集群部署完成后,我们通常需要启⽤管理插件(Management Plugin)来访问可视化 管理界⾯,并创建管理员账号。
1. 启⽤管理插件 在每台 RabbitMQ 节点上执⾏: rabbitmq-plugins enable rabbitmq_management 启⽤之后,RabbitMQ 通常会在本地 15672 端⼝提供 HTTP 管理界⾯。例如,通过 http://1 92.168.1.101:15672 可以访问 rabbit1 的管理⻚⾯。
2.创建管理员账号 在每台节点上都可以执⾏(若已存在相同名称则会提示): rabbitmqctl add_user admin StrongPassword rabbitmqctl set_user_tags admin administrator # 设置该⽤户在 "/" 虚拟主机下的权限,授予全部读写权限 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 注意:请使⽤更强的密码进⾏⽣产环境部署,并按照需分配合适的权限或虚拟主机。 不需要每台机器都创建⽤户,节点接⼊主节点后,⽤户可以同步使⽤

五、集群通信 Cookie 同步
RabbitMQ 集群依赖 Erlang Cookie 来进⾏节点间的验证与通信。在同⼀个集群中的所有节点必须使⽤
相同的 Cookie。该 Cookie 默认保存在 /var/lib/rabbitmq/.erlang.cookie ⽂件中。
选择⼀台节点作为 Cookie 源
在 elk94上查看 Cookie
root@elk94:~# cat /var/lib/rabbitmq/.erlang.cookie
iadka145LJNtxvEnNZrnw+rtTjfJ8SUpiovCUgyq/0DE2JJ246CI9NqM
将内容复制,然后分别在 其他节点上执⾏:
systemctl stop rabbitmq-server
echo "iadka145LJNtxvEnNZrnw+rtTjfJ8SUpiovCUgyq/0DE2JJ246CI9NqM" | tee /var/
lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
systemctl start rabbitmq-server
确保三台节点的 Cookie 完全⼀致,并且权限和拥有者正确,才能保证后续正常组建集群。
六、节点加⼊集群
以 elk94 作为主节点,先将 elk95 、 elk96 加⼊到 elk94 的集群中
在elk95上执⾏:
root@elk95:~# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@elk95 ...
root@elk95:~# rabbitmqctl join_cluster rabbit@elk94
Clustering node rabbit@elk95 with rabbit@elk94
The node is already a member of this cluster
root@elk95:~# rabbitmqctl start_app
Starting node rabbit@elk95 ...
在elk96上执⾏:
root@elk96:~# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@elk96 ...
root@elk96:~# rabbitmqctl join_cluster rabbit@elk94
Clustering node rabbit@elk96 with rabbit@elk94
The node is already a member of this cluster
root@elk96:~# rabbitmqctl start_app
Starting node rabbit@elk96 ...
查看集群状态
可以在任意节点上运⾏以下命令查看集群状态:
root@elk95:~# rabbitmqctl cluster_status
Cluster status of node rabbit@elk95 ...
Basics
Cluster name: rabbit@elk95
Disk Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Running Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Versions
rabbit@elk94: RabbitMQ 3.9.27 on Erlang 24.2.1
rabbit@elk95: RabbitMQ 3.9.27 on Erlang 24.2.1
rabbit@elk96: RabbitMQ 3.9.27 on Erlang 24.2.1
Maintenance status
Node: rabbit@elk94, status: not under maintenance
Node: rabbit@elk95, status: not under maintenance
Node: rabbit@elk96, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@elk94, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk94, interface: [::], port: 5672, protocol: amqp, purpose:
AMQP 0-9-1 and AMQP 1.0
Node: rabbit@elk94, interface: [::], port: 15672, protocol: http, purpos
e: HTTP API
Node: rabbit@elk95, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk95, interface: [::], port: 5672, protocol: amqp, purpose:
AMQP 0-9-1 and AMQP 1.0
Node: rabbit@elk96, interface: [::], port: 15672, protocol: http, purpos
e: HTTP API
Node: rabbit@elk96, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk96, interface: [::], port: 5672, protocol: amqp, purpose:
AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: stream_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
七、设置镜像队列(HA)策略
为保证队列数据在集群节点之间进⾏复制,我们需要设置 镜像队列策略。这⾥示例设置所有队列
⾃动镜像到所有节点上,并⾃动同步。
root@elk94:~# rabbitmqctl set_policy ha-all "^.*" '{"ha-mode":"all","ha-syn
c-mode":"automatic"}' --priority 0 --apply-to queues
Setting policy "ha-all" for pattern "^.*" to "{"ha-mode":"all","ha-sync-mod
e":"automatic"}" with priority "0" for vhost "/" ...
ha-mode: all 表示所有队列将镜像到集群中的所有节点。
ha-sync-mode: automatic 表示当新的节点加⼊时,会⾃动同步队列数据。
^.* 是正则表达式,表示对名称匹配任意队列⽣效;如需针对特定队列或前缀,可⾃⾏修改。
--apply-to queues 表示将策略应⽤到队列。
八、验证集群
确认三个节点都在 running_nodes 列表⾥。
Cluster status of node rabbit@elk94 ...
Basics
Cluster name: rabbit@elk94
Disk Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Running Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
访问管理界⾯进⾏验证
在浏览器中分别访问三台机器的 http://<节点IP或主机名>:15672,使⽤前⾯创建的账号(如 admin)进
⾏登录,查看集群信息、节点信息、队列信息等。

创建⽣产者并发送数据

看到创建的消息队列和5条消息数据被挤压未被消费



⽣产者
import pika
import json
import time
class RabbitMQProducer:
def __init__(self, host='10.0.0.94', port=5672, username='admin', pass
word='123456'):
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
credentials=pika.PlainCredentials(username, password)
)
self.connection = None
self.channel = None
def connect(self):
"""建⽴连接"""
try:
self.connection = pika.BlockingConnection(self.connection_para
ms)
self.channel = self.connection.channel()
print("成功连接到RabbitMQ")
except Exception as e:
print(f"连接RabbitMQ失败: {e}")
raise
def declare_queue(self, queue_name, durable=True):
"""声明队列"""
self.channel.queue_declare(
queue=queue_name,
durable=durable, # 持久化队列
arguments={
'x-message-ttl': 60000 # 消息存活时间60秒
}
)
print(f"队列 '{queue_name}' 已声明")
def send_message(self, queue_name, message, exchange=''):
"""发送消息"""
try:
# 如果消息是字典,转换为JSON字符串
if isinstance(message, dict):
message_body = json.dumps(message)
⽣产者
Python16
else:
message_body = str(message)
# 发布消息
self.channel.basic_publish(
exchange=exchange,
routing_key=queue_name,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
content_type='application/json'
)
)
print(f"消息已发送到队列 '{queue_name}': {message_body}")
return True
except Exception as e:
print(f"发送消息失败: {e}")
return False
def close(self):
"""关闭连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
print("连接已关闭")
# 使⽤示例
def producer_example():
producer = RabbitMQProducer()
try:
# 建⽴连接
producer.connect()
# 声明队列
queue_name = 'test_queue6'
producer.declare_queue(queue_name)
# 发送多条消息
for i in range(5):
message = {
'id': i + 1,
'timestamp': time.time(),
'content': f'这是第 {i + 1} 条测试消息'
}
producer.send_message(queue_name, message)
time.sleep(1) # 模拟处理时间
except Exception as e:
print(f"⽣产者运⾏出错: {e}")
finally:
producer.close()
if __name__ == "__main__":
消费者
import pika
def quick_consumer():
"""快速消费者 - 修改这⾥的认证信息"""
try:
# ⚠️修改为正确的⽤户名和密码
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='10.0.0.95',
port=5672,
credentials=credentials
)
)
channel = connection.channel()
print("✅ 连接成功!")
def callback(ch, method, properties, body):
print(f"收到: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='test_queue7',
on_message_callback=callback,
auto_ack=False
)
print("等待消息...")
channel.start_consuming()
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
quick_consumer()
测试队列⾼可⽤ 连接到其中⼀个节点,创建⼀个队列并发送消息。 关闭该节点的 RabbitMQ 服务或者⽹络,使⽤另⼀个节点进⾏消费,验证消息是否正常可消费、数据是 否依旧存在。 若⼀切⽆误,则说明 RabbitMQ 集群部署成功并具备了⾼可⽤能⼒。 参考 https://blog.csdn.net/kaka_buka/article/details/146060911
0 条评论