
已老实!再学消息队列、死信队列
JMS、AMQP、消息队列、kafka、rocket、rabbitmq
往期推荐
ArrayList、LinkedList、HashMap、HashTable、HashSet、TreeSet-CSDN博客
Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?
目录
消息队列作用
老生常谈,削峰限流、异步解耦、分布式事务、顺序保证、延时定时、数据流处理比如大数据、即时通讯(物联网)
消息队列(基础篇)-4 如何利用事务消息实现分布式事务?_事务消息可以实现分布式事务-CSDN博客
当然引入组件带来的副作用往往是数据一致性、系统复杂性、系统可用性稳定性,毕竟越复杂的东西越容易出问题。
JMS和AMQP协议
JMS
JAVA Message Service,一个java消息服务的规范,类似jdbc,有点对点、发布订阅两种模型:
五种消息类型
StreamMessage:Java
原始值的数据流MapMessage
:一套名称-值对TextMessage
:一个字符串对象ObjectMessage
:一个序列化的 Java 对象BytesMessage
:一个字节的数据流
两种消息模型
- 点对点
一个消息只有一个消费者,未被消费的消息在queue中保留直到被消费或超时。消费者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息- 发布订阅
生产者把消息广播到一个topic,该topic可以有多个消费者消费。
在kafka中,一个topic可以有多个分区partition,单个分区的消息是有序的,而全局的topic的多个分区的消息是无序的。这就是为什么kafka一条消息只能被同一个消费者组里面的一个消费者消费,这样就某种程度上保证了消息的不重复消费和乱序消费。
AMQP
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。RabbitMQ 就是基于 AMQP 协议实现的。
消息类型
二进制字节数组
五种消息模型
①direct exchange;
②fanout exchange;
③topic change;
④headers exchange;
⑤system exchange;
本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分;
几种消息队列
消息中间件:Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列 综合对比_kafuka tcp 与 mq 队列是否类似-CSDN博客
kafka
具体看这个:Kafka入门到入土——万字详解,图文并茂_图解kafka-CSDN博客
kafka的优势
- 极致的性能:
基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。- 生态系统兼容性:
Kafka 与 周边生态系统的兼容性是最好的没有之一,尤其在大数据(数据吞吐量大)和流计算领域。
kafka 为什么性能比 RocketMQ 好
这里性能主要指吞吐量,kafka使用了sendfile零拷贝,RocketMQ 使用的是 mmap 零拷贝技术,具体可以看这个
用户态和内核态、进程、协程及线程几种状态、DMA、零拷贝_进程和线程 用户态和内核态-CSDN博客
为什么RocketMQ不使用sendfile呢?ssize_t sendfile(int out_fd, int in_fd, off_t* offset, size_t count); // num = sendfile(xxx); void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset); // buf = mmap(xxx)
sendfile返回的是发送成功了几个字节数,具体发了什么内容,应用层根本不知道。
mmap
返回的是数据的具体内容,应用层能获取到消息内容并进行一些逻辑处理。而 RocketMQ 的一些功能,却需要了解具体这个消息内容,方便二次投递等,比如将消费失败的消息重新投递到死信队列中,如果 RocketMQ 使用 sendfile,那根本没机会获取到消息内容长什么样子,也就没办法实现一些好用的功能了。一句话总结就是:和 Kafka 相比,RocketMQ 在架构上做了减法,在功能上做了加法"
kafka如何保证消息有序
kafka的消息存储在topic的Partition中,整体来看topic的消息是无序的,但是单个Partition的消息是有序的,每次添加消息到 Partition的时候都会采用尾加法,并为其分配一个特定的offset。因此为保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
kafka如何保证消息不丢失
生产者丢失消息
生产者(Producer) 调用
send
方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用
send
方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用send
方法发送消息实际上是异步的操作,我们可以通过get()
方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get(); if (sendResult.getRecordMetadata() != null) { logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe sult.getProducerRecord().value().toString()); }
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o); future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()), ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
另外可以为producer设置失败的重试次数和重试的时间间隔。如果多次重试失败,可以把消息加入死信队列
消费者丢失消息
消息加入分区后会有一个offset,如果消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset,此时如果消费者挂掉,那么消息并没有被消费,offset却自动提交了。一种解决方法是关闭自动提交,开启手动提交,但是如果消费完消息还没手动提交offset,消费者挂掉,那么该消息会被消费第二次甚至更多次,直到offset提交。
kafka丢消息
kafka的分区有一个多副本机制,副本之间有一个leader副本,其他副本是follower,如果leader副本突然挂掉,有些数据还未来得及同步到follower中,会消息丢失。
解决方法有以下几种:
- 设置acks=all,表示只有所有 ISR 列表(所有的可用副本)的副本全部收到消息时,生产者才会接收到来自服务器的响应。acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。
- 设置min.insync.replicas > 1,代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
- 设置 unclean.leader.election.enable = false,各个follower的同步情况不一样,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
kafka如何保证消息不重复消费
消息重复消费的根本原因是已消费的数据没有成功提交offset,在上面消费者丢失消息中已讲到。
RocketMQ
rocketMQ是阿里开源的消息队列,由java开发,和kafka的topic中是partition,rocketMQ的topic中是queue,queue可以分布在不同的borker中来容灾。和kafka相同,对于一个topic的一个queue,同个消费者组有一个消费者消费
RocketMQ架构
NameServer
、Broker
、Producer
、Consumer
,NameServer作用和kafka的zk相同,用来维护rocketmq的元信息,注册发现borker和路由信息管理,但是相对zk来说更为轻量
RabbitMQ
基于AMQP实现,由erlang编写,在 RabbitMQ 中,消息并不是直接被投递到queue中,中间还必须经过 Exchange(交换器) 这一层,Exchange会把消息分配到对应的queue。
生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。
exchange类型
RabbitMQ 的 Exchange 有 4 种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers。
- fanout把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
- direct把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中,常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
- topic把可以把一条消息发送到匹配的多个queue中,BindingKey和RoutingKey 为一个点号“.”分隔的字符串,如a.b.c,BindingKey 中还可以存在“*”和“#”做模糊匹配,“*”用于匹配1个单词,“#”匹配0或多个单词。
- headers 类型的交换器不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行完全匹配。
死信队列
当消息在一个队列中变成死信之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致的死信的几种原因:
- 消息被拒(
Basic.Reject /Basic.Nack
) 且requeue = false
。- 消息 TTL 过期。
- 队列满了,无法再添加。
更多推荐
所有评论(0)