一、RabbitMQ概述

(1)什么是消息队列

特性对比AMQPJMS
七层网络模型传输层和会话层应用层
支持的编程语言和平台支持多种编程语言和平台,包括Java、C++、Python等针对Java平台,在其他编程语言和平台上支持相对较少
传输协议二进制协议进行消息传递,提供了高效、可靠的消息投递机制使用面向文本的协议(如HTTP/TCP),消息传递效率可能较低
消息模型支持多种消息模型,包括点对点(P2P)和发布订阅(Pub/Sub)主要支持点对点(P2P)和发布订阅(Pub/Sub)模型
可靠性提供了强大的可靠性保证,包括消息持久化、事务性消息和消息确认机制支持消息持久化和事务性消息,但具体实现取决于消息传递系统的提供者
扩展性和兼容性具有很好的扩展性和兼容性,可以在不同的消息代理之间交互操作在Java环境中有较好的扩展性和兼容性,但在非Java环境集成时受到限制

(2)主流MQ产品对比

特性对比RabbitMQKafkaRocketMQActiveMQ
研发团队Rabbit(公司性质)Apache(社区)阿里(公司)Apache(社区)
开发语言ErlangScala & JavaJavaJava
核心机制基于AMQP的消息队列模型,生产者-消费者模式分布式流平台,发布/订阅模型,高吞吐量分布式消息队列,支持topic、tag消息分类过滤基于JMS的消息传递模型,支持点对点和发布/订阅
协议支持XMPP、STOMP、SMTP自定义协议(社区封装HTTP支持)自定义协议XMPP、STOMP、OpenWire、REST
客户端支持语言官方:Erlang、Java、Ruby;社区几乎支持所有语言官方:Java;社区:PHP、Python、Go等Java、C++Java、C、C++、Python、PHP、Perl、.NET等
可用性机制镜像队列、仲裁队列分区、副本主从复制主从复制
单机吞吐量10w/s100w/s10w+/s(双十一案例支撑)10w-/s
消息延迟微妙级毫秒级毫秒级毫秒级
消息确认完整的消息确认机制N/A内置消息表,支持消息持久化到数据库N/A
功能特性并发能力好、性能高、延迟低、社区活跃、管理界面丰富专为大数据场景设计,功能相对集中MQ功能完备、扩展性好、适合企业级复杂场景老牌产品、成熟度高、文档丰富

(3)什么是RabbitMQ

RabbitMQ 是⼀个采⽤AMQP协议的开源消息代理软件,主要⽤于实现分布式系统间的可靠异步 通信、应⽤解耦和流量削峰。 RabbitMQ和Kafka的核⼼区别在于设计⽬标、架构模型和适⽤场景:RabbitMQ专注于可靠、低 延迟的消息传递,适合实时任务和复杂路由;⽽Kafka专为⾼吞吐量、持久化的流数据处理设计, 适⽤于⼤数据⽇志、事件溯源和实时分析场景。 环境准备 操作系统:确保所有节点运⾏相同版本的操作系统(Linux 是最常⻅的选择)。 主机名:为每个节点配置唯⼀的主机名,确保可以通过 DNS 或 /etc/hosts ⽂件解析到 IP 地址。 时间同步:使⽤ ntp 或 chrony 服务同步时间。 Erlang:RabbitMQ 依赖 Erlang。确保所有节点安装相同版本的 Er RabbitMQ 使⽤主机名和 Erlang 的 nodename 区分节点,因此每个节点的主机名必须唯⼀。

二、环境准备及主机名设置

RabbitMQ 使⽤主机名和 Erlang 的 nodename 区分节点,因此每个节点的主机名必须唯⼀。

系统ip主机名
Ubuntu22.0410.0.0.94elk94
Ubuntu22.0410.0.0.95elk95
Ubuntu22.0410.0.0.96elk96

在每台机器的 /etc/hosts 中添加集群中所有节点的主机名与 IP 对应关系 root@elk94:~# cat /etc/hosts 10.0.0.94 elk94 10.0.0.95 elk95 10.0.0.96 elk96

三、安装 Erlang 和 RabbitMQ

版本对应关系https://www.rabbitmq.com/docs/which-erlang

在每台机器上执⾏

先进⾏软件包索引更新,防⽌有依赖问题
apt-get update
安装
sudo apt-get install -y erlang
sudo apt-get install -y rabbitmq-server

启动并查看状态
安装完成后,RabbitMQ 会被⾃动启动并开机⾃启动

root@elk95:~# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
 
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enable
d; vendor preset: enabled)
 
Active: active (running) since Sun 2025-11-09 21:31:06 CST; 39s ago
 
Main PID: 43768 (beam.smp)
 
Tasks: 24 (limit: 4519)
 
Memory: 94.7M
 
CPU: 3.341s
 
CGroup: /system.slice/rabbitmq-server.service
 
├─43768 /usr/lib/erlang/erts-12.2.1/bin/beam.smp -W w -MBas a
geffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -sbwt none -sbwtdcpu none -sbwtdio none -- -
root /u>
 
├─43780 erl_child_setup 65536
 
├─43831 inet_gethost 4
 
├─43832 inet_gethost 4
 
└─43835 /bin/sh -s rabbit_disk_monitor

四、启⽤管理插件并创建管理员账号

RabbitMQ 集群部署完成后,我们通常需要启⽤管理插件(Management Plugin)来访问可视化 管理界⾯,并创建管理员账号。 

1. 启⽤管理插件 在每台 RabbitMQ 节点上执⾏: rabbitmq-plugins enable rabbitmq_management 启⽤之后,RabbitMQ 通常会在本地 15672 端⼝提供 HTTP 管理界⾯。例如,通过 http://1 92.168.1.101:15672 可以访问 rabbit1 的管理⻚⾯。

2.创建管理员账号 在每台节点上都可以执⾏(若已存在相同名称则会提示): rabbitmqctl add_user admin StrongPassword rabbitmqctl set_user_tags admin administrator # 设置该⽤户在 "/" 虚拟主机下的权限,授予全部读写权限 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 注意:请使⽤更强的密码进⾏⽣产环境部署,并按照需分配合适的权限或虚拟主机。 不需要每台机器都创建⽤户,节点接⼊主节点后,⽤户可以同步使⽤

五、集群通信 Cookie 同步

RabbitMQ 集群依赖 Erlang Cookie 来进⾏节点间的验证与通信。在同⼀个集群中的所有节点必须使⽤
相同的 Cookie。该 Cookie 默认保存在 /var/lib/rabbitmq/.erlang.cookie ⽂件中。
选择⼀台节点作为 Cookie 源
在 elk94上查看 Cookie

root@elk94:~# cat /var/lib/rabbitmq/.erlang.cookie
iadka145LJNtxvEnNZrnw+rtTjfJ8SUpiovCUgyq/0DE2JJ246CI9NqM

将内容复制,然后分别在 其他节点上执⾏:
systemctl stop rabbitmq-server
echo "iadka145LJNtxvEnNZrnw+rtTjfJ8SUpiovCUgyq/0DE2JJ246CI9NqM" | tee /var/
lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
systemctl start rabbitmq-server

确保三台节点的 Cookie 完全⼀致,并且权限和拥有者正确,才能保证后续正常组建集群。

六、节点加⼊集群

以 elk94 作为主节点,先将 elk95 、 elk96 加⼊到 elk94 的集群中
在elk95上执⾏:

root@elk95:~# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@elk95 ...
root@elk95:~# rabbitmqctl join_cluster rabbit@elk94
Clustering node rabbit@elk95 with rabbit@elk94
The node is already a member of this cluster
root@elk95:~# rabbitmqctl start_app
Starting node rabbit@elk95 ...

在elk96上执⾏:
root@elk96:~# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@elk96 ...
root@elk96:~# rabbitmqctl join_cluster rabbit@elk94
Clustering node rabbit@elk96 with rabbit@elk94
The node is already a member of this cluster
root@elk96:~# rabbitmqctl start_app
Starting node rabbit@elk96 ...

查看集群状态
可以在任意节点上运⾏以下命令查看集群状态:
root@elk95:~# rabbitmqctl cluster_status
Cluster status of node rabbit@elk95 ...
Basics
Cluster name: rabbit@elk95
Disk Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Running Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Versions
rabbit@elk94: RabbitMQ 3.9.27 on Erlang 24.2.1
rabbit@elk95: RabbitMQ 3.9.27 on Erlang 24.2.1
rabbit@elk96: RabbitMQ 3.9.27 on Erlang 24.2.1
Maintenance status
Node: rabbit@elk94, status: not under maintenance
Node: rabbit@elk95, status: not under maintenance
Node: rabbit@elk96, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@elk94, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk94, interface: [::], port: 5672, protocol: amqp, purpose: 
AMQP 0-9-1 and AMQP 1.0

Node: rabbit@elk94, interface: [::], port: 15672, protocol: http, purpos
e: HTTP API
Node: rabbit@elk95, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk95, interface: [::], port: 5672, protocol: amqp, purpose: 
AMQP 0-9-1 and AMQP 1.0
Node: rabbit@elk96, interface: [::], port: 15672, protocol: http, purpos
e: HTTP API
Node: rabbit@elk96, interface: [::], port: 25672, protocol: clustering, pu
rpose: inter-node and CLI tool communication
Node: rabbit@elk96, interface: [::], port: 5672, protocol: amqp, purpose: 
AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: stream_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled

七、设置镜像队列(HA)策略

为保证队列数据在集群节点之间进⾏复制,我们需要设置 镜像队列策略。这⾥示例设置所有队列
⾃动镜像到所有节点上,并⾃动同步。

root@elk94:~# rabbitmqctl set_policy ha-all "^.*" '{"ha-mode":"all","ha-syn
c-mode":"automatic"}' --priority 0 --apply-to queues
Setting policy "ha-all" for pattern "^.*" to "{"ha-mode":"all","ha-sync-mod
e":"automatic"}" with priority "0" for vhost "/" ...

ha-mode: all 表示所有队列将镜像到集群中的所有节点。
ha-sync-mode: automatic 表示当新的节点加⼊时,会⾃动同步队列数据。
^.* 是正则表达式,表示对名称匹配任意队列⽣效;如需针对特定队列或前缀,可⾃⾏修改。
--apply-to queues 表示将策略应⽤到队列。

八、验证集群

确认三个节点都在 running_nodes 列表⾥。

Cluster status of node rabbit@elk94 ...
Basics
Cluster name: rabbit@elk94
Disk Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96
Running Nodes
rabbit@elk94
rabbit@elk95
rabbit@elk96

访问管理界⾯进⾏验证
在浏览器中分别访问三台机器的 http://<节点IP或主机名>:15672,使⽤前⾯创建的账号(如 admin)进
⾏登录,查看集群信息、节点信息、队列信息等。

创建⽣产者并发送数据

看到创建的消息队列和5条消息数据被挤压未被消费

⽣产者
import pika
import json
import time
class RabbitMQProducer:
 
def __init__(self, host='10.0.0.94', port=5672, username='admin', pass
word='123456'):
 
self.connection_params = pika.ConnectionParameters(
 
host=host,
 
port=port,
 
credentials=pika.PlainCredentials(username, password)
 
)
 
self.connection = None
 
self.channel = None
 
def connect(self):
 
"""建⽴连接"""
 
try:
 
self.connection = pika.BlockingConnection(self.connection_para
ms)
 
self.channel = self.connection.channel()
 
print("成功连接到RabbitMQ")
 
except Exception as e:
 
print(f"连接RabbitMQ失败: {e}")
 
raise
 
def declare_queue(self, queue_name, durable=True):
 
"""声明队列"""
 
self.channel.queue_declare(
 
queue=queue_name,
 
durable=durable, # 持久化队列
 
arguments={
 
'x-message-ttl': 60000 # 消息存活时间60秒
 
}
 
)
 
print(f"队列 '{queue_name}' 已声明")
 
def send_message(self, queue_name, message, exchange=''):
 
"""发送消息"""
 
try:
 
# 如果消息是字典,转换为JSON字符串
 
if isinstance(message, dict):
 
message_body = json.dumps(message)

⽣产者 
Python16
 
else:
 
message_body = str(message)
 
# 发布消息
 
self.channel.basic_publish(
 
exchange=exchange,
 
routing_key=queue_name,
 
body=message_body,
 
properties=pika.BasicProperties(
 
delivery_mode=2, # 持久化消息
 
content_type='application/json'
 
)
 
)
 
print(f"消息已发送到队列 '{queue_name}': {message_body}")
 
return True
 
except Exception as e:
 
print(f"发送消息失败: {e}")
 
return False
 
def close(self):
 
"""关闭连接"""
 
if self.connection and not self.connection.is_closed:
 
self.connection.close()
 
print("连接已关闭")
# 使⽤示例
def producer_example():
 
producer = RabbitMQProducer()
 
try:
 
# 建⽴连接
 
producer.connect()
 
# 声明队列
 
queue_name = 'test_queue6'
 
producer.declare_queue(queue_name)
 
# 发送多条消息
 
for i in range(5):
 
message = {
 
'id': i + 1,
 
'timestamp': time.time(),
 
'content': f'这是第 {i + 1} 条测试消息'
 
}
 
producer.send_message(queue_name, message)
 
time.sleep(1) # 模拟处理时间
 
except Exception as e:
 
print(f"⽣产者运⾏出错: {e}")
 
finally:
 
producer.close()
if __name__ == "__main__":
消费者
import pika
def quick_consumer():
 
"""快速消费者 - 修改这⾥的认证信息"""
 
try:
 
# ⚠️修改为正确的⽤户名和密码
 
credentials = pika.PlainCredentials('admin', '123456')
 
connection = pika.BlockingConnection(
 
pika.ConnectionParameters(
 
host='10.0.0.95',
 
port=5672,
 
credentials=credentials
 
)
 
)
 
channel = connection.channel()
 
print("✅ 连接成功!")
 
def callback(ch, method, properties, body):
 
print(f"收到: {body.decode()}")
 
ch.basic_ack(delivery_tag=method.delivery_tag)
 
channel.basic_consume(
 
queue='test_queue7',
 
on_message_callback=callback,
 
auto_ack=False
 
)
 
print("等待消息...")
 
channel.start_consuming()
 
except Exception as e:
 
print(f"错误: {e}")
if __name__ == "__main__":
 
quick_consumer()
    

测试队列⾼可⽤ 连接到其中⼀个节点,创建⼀个队列并发送消息。 关闭该节点的 RabbitMQ 服务或者⽹络,使⽤另⼀个节点进⾏消费,验证消息是否正常可消费、数据是 否依旧存在。 若⼀切⽆误,则说明 RabbitMQ 集群部署成功并具备了⾼可⽤能⼒。 参考 https://blog.csdn.net/kaka_buka/article/details/146060911

分类: ES集群

0 条评论

发表回复

Avatar placeholder

您的邮箱地址不会被公开。 必填项已用 * 标注