来源:推好下载站Gamer发布时间: 2024-11-22 10:14:10
上午一好哥们微信我
哥们:哥们在干嘛,晚上出来吃饭
我:就我俩吗
哥们:对啊
我:那多没意思,我叫俩女的出来
哥们:好啊,哈哈哈
晚上吃完饭到家后,我给哥们发消息
我:今天吃的真开心,下次继续
哥们:开心尼玛呢!下次再叫你老婆和你女儿来,我特么踢死你
正文开始之前了,我们先来正确审视下文章标题:
不依赖 Spring,你会如何自实现 RabbitMQ 消息的消费
,主要从两点来进行审视
不依赖 Spring
作为一个
Javaer
,关于
Spring
的重要性,我相信你们都非常清楚;回头看看你们开发的项目,是不是都是基于 Spring 的?如果不依赖 Spring,你们还能继续开发吗?不过话说回来,既然 Spring 能带来诸多便利,该用还得用,不要头铁,不要造低效轮子!
如果能造出比 Spring 优秀的轮子,那你应该造!
你们可能会说:不依赖 Spring 就不依赖嘛,我可以依赖
Spring Boot
噻;你们要是这么聊天,那就没法聊了
Spring Boot 是不是基于 Spring 的?没有 Spring,Spring Boot 也是跑不起来的;
不依赖 Spring
的言外之意就是不依赖 Spring 生态,当然也包括 Spring Boot
关于
不依赖 Spring
,我就当你们审视清楚了哦
依赖 RabbitMQ Java Client
与 RabbitMQ 服务端的交互,咱们就不要逞强去自实现了,老实点用官方提供的 Java Client 就好
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
注意 client 版本要与 RabbitMQ 版本兼容
所以文章标题就可以转换成
只依赖 RabbitMQ Java Client,不依赖 Spring,如何自实现 RabbitMQ 消息的消费
另外,我再带你们回顾下 RabbitMQ 的 Connection 和 Channel
Connection
Connection
是客户端与 RabbitMQ 服务器之间的一个 TCP 连接,它是进行通信的基础,允许客户端发送命令到 RabbitMQ 服务器并接收响应;Connection 是比较重的资源,不能随意创建与关闭,一般会以
池
的方式进行管理。每个 Connection 可以包含多个 Channel
Channel
Channel
是
多路复用
连接(Connection)中的一条独立的双向数据流通道。客户端与 RabbitMQ 服务端之间的大多数操作都是在 Channel 上进行的,而不是在 Connection 上直接进行。Channel 比 Connection 更轻量级,可以在同一连接中创建多个 Channel 以实现并发处理
Channel 与 Consumer 之间的关系是一对多的,具体来说,一个 Channel 可以绑定多个 Consumer,但每个 Consumer 只能绑定到一个
我们采取主干到枝叶的实现方式,逐步实现并完善 RabbitMQ 消息的消费
依赖 RabbitMQ Java Client 来消费 RabbitMQ 的消息,代码实现非常简单,网上一搜一大把
/**
* @author: 青石路
*/
public class RabbitTest1 {
private static final String QUEUE_NAME = "qsl.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = initConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, false, new QslConsumer(channel));
System.out.println(Thread.currentThread().getName() + " 线程执行完毕,消费者:" + consumerTag + "已经就绪");
}
public static ConnectionFactory initConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.5.108.226");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setConnectionTimeout(30000);
return factory;
}
static class QslConsumer extends DefaultConsumer {
QslConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(Thread.currentThread().getName()+ " 收到消息:" + message);
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
}
是不是很简单?
这里我得补充下,
exchange
、
queue
没有在代码中声明,绑定关系也没有声明,是为了简化代码,因为文章标题是
消费
;实际
exchange
、
queue
、
binding
这些已经存在,如下图所示
上述代码,我相信你们都能看懂,主要强调下 2 点
消息是否自动 Ack
对应代码
channel.basicConsume(QUEUE_NAME, false, new QslConsumer(channel));
basicConsume
的第二个参数,其注释如下
autoAck
为
true
表示消息在送达到 Consumer 后被 RabbitMQ 服务端确认,消息就会从队列中剔除了;
autoAck
为
false
表示 Consumer 需要显式的向 RabbitMQ 服务端进行消息确认
因为我们将
autoAck
设置成了
true
,所以 main 线程存活的时间内,5 个消息被送达到 main 线程后就被 RabbitMQ 服务端确认了,也就从队列中删除了
手动确认
如果 Consumer 的
autoAck
设置的是
false
,那么需要显示的进行消息确认
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
否则 RabbitMQ 服务端会将消息一直保留在队列中,反复投递
执行 main 方法,控制台输出如下
我们去 RabbitMQ 控制台看下队列
qsl.queue
的消费者
Consumer tag
值是:
amq.ctag-PxjqYiujeCvyYlgtvMz9EQ
,与控制台的输出一致;我们手动往队列中发送一条消息
控制台输出如下
自此,主流程就通了,此时已经实现 RabbitMQ 消息的消费
单消费者肯定存在性能瓶颈,所以我们需要支持多消费者,并且是同个队列的多消费者;实现方式也很简单,只需要调整下 main 方法即可
public static void main(String[] args) throws Exception {
ConnectionFactory factory = initConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
QslConsumer qslConsumer = new QslConsumer(channel);
String consumerTag1 = channel.basicConsume(QUEUE_NAME, false, qslConsumer);
String consumerTag2 = channel.basicConsume(QUEUE_NAME, false, qslConsumer);
String consumerTag3 = channel.basicConsume(QUEUE_NAME, false, qslConsumer);
System.out.println(Thread.currentThread().getName() + " 线程执行完毕,消费者["
+ Arrays.asList(consumerTag1, consumerTag2, consumerTag3) + "]已经就绪");
}
执行main 方法后 Channel 与 Consumer 关系如下
此时是同个 Channel 绑定了 3 个不同的 Consumer;当然也可以一对一绑定,main 方法调整如下
public static void main(String[] args) throws Exception {
ConnectionFactory factory = initConnectionFactory();
Connection connection = factory.newConnection();
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();
Channel channel3 = connection.createChannel();
QslConsumer qslConsumer1 = new QslConsumer(channel1);
QslConsumer qslConsumer2 = new QslConsumer(channel2);
QslConsumer qslConsumer3 = new QslConsumer(channel3);
String consumerTag1 = channel1.basicConsume(QUEUE_NAME, false, qslConsumer1);
String consumerTag2 = channel2.basicConsume(QUEUE_NAME, false, qslConsumer2);
String consumerTag3 = channel3.basicConsume(QUEUE_NAME, false, qslConsumer3);
System.out.println(Thread.currentThread().getName() + " 线程执行完毕,消费者["
+ Arrays.asList(consumerTag1, consumerTag2, consumerTag3) + "]已经就绪");
}
执行 main 方法后 Channel 与 Consumer 关系如下
既然两种方式都可以实现多消费者,哪那种方式更好呢
Channel 与 Consumer 一对一绑定更好!
Channel 之间是线程安全的,同个 Channel 内非线程安全,所以同个 Channel 上同时处理多个消费者存在并发问题;另外 RabbitMQ 的消息确认机制是基于Channel 的,如果一个 Channel 上绑定多个消费者,那么消息确认会变得复杂,非常容易导致消息重复消费或丢失
也许你们会觉得
一对一
的绑定相较于
一对多
的绑定,存在资源浪费问题;确实是有这个问题,但我们要知道,Channel 是 Connection 中的一条独立的双向数据流通道,非常轻量级,相较于并发带来的一系列问题而言,这点小小的资源浪费可以忽略不计了
消费者数量能不能配置化呢,当然可以,调整非常简单
private static final int concurrency = 3;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = initConnectionFactory();
Connection connection = factory.newConnection();
for (int i = 0; i < concurrency; i++) {
Channel channel = connection.createChannel();
QslConsumer qslConsumer = new QslConsumer(channel);
String consumerTag = channel.basicConsume(QUEUE_NAME, false, qslConsumer);
System.out.println("消费者:" + consumerTag + " 已经就绪");
}
}
concurrency
的值是从数据库读取,还是从配置文件中获取,就可以发挥你们的想象呢;如果依赖 Spring 的话,往往会用配置文件的方式注入进来
队列
qsl.queue
没有消费者的情况下,我们往队列中添加 5 条消息:我是消息1 ~ 我是消息5,然后调整下
handleDelivery
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(consumerTag + " 收到消息:" + message);
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
最后执行 main,控制台输出如下
大家注意看框住的那部分,5 条消息被同一个消费者给消费了!5 条消息为什么不是负载均衡到 3 个消费者呢?这是因为消费者的
prefetch count
(即
预取数
)没有设置
prefetch count 是消费者在接收消息时,告诉 RabbitMQ 一次最多可以发送多少条消息给该消费者。默认情况下,这个值是 0,这意味着 RabbitMQ 会尽可能快地将消息分发给消费者,而不考虑消费者当前的处理能力
再回过头去看控制台的输出,是不是就能理解了?一旦某个消费者就绪,队列中的 5 条消息全部推给它了,后面就绪的 2 个消费者就没有消息可消费了;所以我们需要配置
prefetch count
以实现负载均衡,调整很简单
private static final String QUEUE_NAME = "qsl.queue";
private static final int concurrency = 3;
private static final int prefetchCount = 1;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = initConnectionFactory();
Connection connection = factory.newConnection();
for (int i = 0; i < concurrency; i++) {
Channel channel = connection.createChannel();
channel.basicQos(prefetchCount);
QslConsumer qslConsumer = new QslConsumer(channel);
String consumerTag = channel.basicConsume(QUEUE_NAME, false, qslConsumer);
System.out.println("消费者:" + consumerTag + " 已经就绪");
}
}
然后重复如上的测试,控制台输出如下
是不是实现了我们想要的
负载均衡
?
prefetch count 的设置需要根据实际的业务需求和消费者的处理能力进行调整;如果设置得太高,可能会导致内存占用过多;如果设置得太低,则可能无法充分利用消费者的处理能力
限于篇幅,我就只列举几个还待完善的点
- 目前只支持单个队列,需要支持多个队列
- 目录消费逻辑单一固定,需要支持动态指定逻辑,不同的队列对应不同的消费逻辑
- 消费者支持停止和重启
- ...
关于这些点,我们下篇不见不散
Connection、Channel、Consumer 之间的关系需要理清楚
Connection 是 TCP 连接;Channel 是 Connection 中的双向数据流通道;Channel 可以绑定多个 Consumer,但推荐一个 Channel 只绑定一个 Consumer
IO 多路复用
是网络编程中常用的技术,建议大家掌握
基于 RabbitMQ Java Client 提供的 API,实现了消息消费、多消费者以及负载均衡
没有 Spring,我们照样可以很优雅的消费 RabbitMQ 的消息
相关文章
《名酱三国H5》巡逻任务攻略技巧指南,打败以武将登场!(以“巡逻任务”为主,让你迅速提升等级!)
《梦幻新诛仙手游》阵法推荐攻略指南,战斗更轻松(最佳阵法搭配指南,让你成为阵法大师)
《魔兽世界》9.0版本影怖护臂获得攻略技巧指南(全面攻略,轻松获得影怖护臂)
《王者荣耀蔷薇恋人》优化方案解析(优化你的游戏体验,畅玩王者荣耀!)
《金铲铲之战》霜卫首领出装攻略技巧指南(打造强力霜卫首领,抗衡敌军的必备武器)
《机动战姬聚变》最新更新大揭秘!
摩尔庄园手游大头菇采集攻略最新指南(轻松掌握采集技巧,快速提升收益)
摩尔庄园手游载具获取方法全盘点技巧秘籍(学会这些方法,让你的庄园生活更便捷!)
《以金铲铲之战》集福令攻略指南
《双生视界》游戏经营咖啡店小技巧攻略指南(15个让你轻松拥有人气咖啡厅的秘诀)
热门推荐