# MQ面试问题
# RabbitMQ的作用
- 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
- 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
- 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
- 日志处理 - 解决大量日志传输。
- 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
# 解耦、异步、削峰是什么?
解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。 如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据, 直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据, 不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。就是一个系统或者一个模块,调用了多个系统或者模块, 互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms, BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s, 用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中, 假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
削峰:减少高峰时期对服务器压力。
# 延迟队列、优先级队列
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
- 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
# 死信队列原理
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后, 它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致的死信的几种原因:
- 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
- 消息 TTL 过期。
- 队列满了,无法再添加
# 消息队列有什么缺点
- 系统可用性降低:本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
- 系统复杂度提高:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
- 一致性问题:A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现, 妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。
# 生产环境用消息中间件?
这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。 举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。
但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。 然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。
而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。 除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。
但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。 然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。
而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。
另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。 但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。 因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。
# Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么优缺点?
综上,各种对比之后,有如下建议:
一般的业务系统要引入 MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态, 但是确实人家是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险 (目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的, 推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
# MQ出现顺序错乱的场景
为什么要保证顺序: 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。 举例:比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。 比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新, 那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
出现顺序错乱的场景
rabbitmq ①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的, 无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
kafka ①kafka一个topic,一个partition,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。
②具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
# 保证消息的消费顺序
rabbitmq ①拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
②或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
kafka ①确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费。
写N个内存queue,然后N个线程分别消费一个内存queue即可
# MQ消息的重复问题(幂等性问题)
- 造成消息重复的根本原因是:网络不可达。
- 所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
- 消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID, 如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
# RabbitMQ基本概念
- Broker: 简单来说就是消息队列服务器实体
- Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
- Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
- Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
- VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
- Producer: 消息生产者,就是投递消息的程序
- Consumer: 消息消费者,就是接受消息的程序
- Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
# 消息怎么路由?
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键, 可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种:
- fanout:如果交换器收到消息,将会广播到所有绑定的队列上
- direct:如果路由键完全匹配,消息就被投递到相应的队列
- topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符
# 消息基于什么传输
由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。 信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。
# RabbitMQ的工作模式
simple模式(即最简单的收发模式)
消息产生消息,将消息放入队列
- 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,
- 造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
work工作模式(资源的竞争)
- 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容, 谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
publish/subscribe发布订阅(共享资源)
- 每个消费者监听自己的队列;
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
routing路由模式
- 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
- 根据业务功能定义路由字符串
- 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
- 业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
topic 主题模式(路由模式的一种)
- 星号井号代表通配符
- 星号代表多个单词,井号代表一个单词
- 路由功能添加模糊匹配
- 消息产生者产生消息,把消息交给交换机
- 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)
# RabbitMQ消息的可靠传输?
消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。
- 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
- RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
- RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。
发送方确认模式
- 将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
- 一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
- 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
- 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
接收方确认机制
- 消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
- 这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重) 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
生产者丢失消息:confirm模式
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
- transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
- confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;
- rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
- 如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
消息队列丢数据:消息持久化
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。 这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样, 如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?这里顺便说一下吧,其实也很容易,就下面两步
- 将queue的持久化标识durable设置为true,则代表是一个持久的队列
- 发送消息的时候将deliveryMode=2 这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据。
消费者丢失消息:手动确认消息:
- 消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
- 消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
- 如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
# message 都使用持久化机制?
首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。
其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于, 若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后, 在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性, 同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建, 只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。
所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器), 则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。 另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。
# MQ消息持续积压
消息积压处理办法:临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉, 这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。 我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了, 然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据, 写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面 ,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有, 谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
# 设计MQ思路
比如说这个消息队列系统,我们从以下几个角度来考虑一下:
首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗, 参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。 如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。 那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。 多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
能不能支持数据 0 丢失啊?可以呀,有点复杂的。
# MQ的技术选型
Kafka
Kafka 是 LinkedIn 开源的一个分布式流式处理平台,已经成为 Apache 顶级项目,早期被用来用于处理海量的日志, 后面才慢慢发展成了一款功能全面的高性能消息队列。Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成, 可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。
- 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
- 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
- 流式处理平台:在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
RocketMQ
RocketMQ 是阿里开源的一款云原生“消息、事件、流”实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。 RocketMQ 的核心特性(摘自 RocketMQ 官网):
- 云原生:生与云,长与云,无限弹性扩缩,K8s 友好
- 高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
- 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
- 金融级:金融级的稳定性,广泛用于交易核心链路。
- 架构极简:零外部依赖,Shared-nothing 架构。
- 生态友好:无缝对接微服务、实时计算、数据湖等周边生态。
RabbitMQ
RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件, 它最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 发展到今天,被越来越多的人认可, 这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ 的具体特点可以概括为以下几点:
- 可靠性: RabbitMQ 使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
- 灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。 针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们讲 RabbitMQ 核心概念的时候详细介绍到。
- 扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
- 多语言客户端: RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
- 易用的管理界面: RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
- 插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI 机制
Pulsar
Pulsar 是下一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。Pulsar 集消息、存储、轻量化函数式计算为一体, 采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性, 被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Pulsar 的关键特性如下(摘自官网):
- 下一代云原生分布式消息流平台。
- Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
- 极低的发布延迟和端到端延迟。
- 可无缝扩展到超过一百万个 topic。简单的客户端 API,支持 Java、Go、Python 和 C++。
- 主题的多种订阅模式(独占、共享和故障转移)。
- 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
- 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
- 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
- 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如 S3、GCS)中。
对比方向 | 概要 |
---|---|
吞吐量 | 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。 |
可用性 | 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 Kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
时效性 | RabbitMQ 基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级,其他几个都是 ms 级。 |
功能支持 | Pulsar 的功能更全面,支持多租户、多种消费模式和持久性模式等功能,是下一代云原生分布式消息流平台。 |
消息丢失 | ActiveMQ 和 RabbitMQ 丢失的可能性非常低, Kafka、RocketMQ 和 Pulsar 理论上可以做到 0 丢失。 |
# RabbitMQ 高可用的
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。 RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机模式: Demo/test 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。
普通集群模式:意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上, 但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。 你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的, 就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式: 这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下, 你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像, 包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。 RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的, 也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。 坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的, 镜像集群下,也是每个节点都放这个 queue 的完整数据。
# Kafka的特点与使用场景
Kafka的特点
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区, 消费组对分区进行消费操作;
- 可扩展性:kafka集群支持热扩展;
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
- 高并发:支持数千个客户端同时读写
使用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
- 消息系统:解耦和生产者和消费者、缓存消息等;
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
- 流式处理:比如spark streaming和storm;
# kafa的核心概念
Producer: 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer: 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Topic: 在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。
Partition:topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。 partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Partition offset
每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。
Replicas of partition
副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据, 而是从为leader的partition中读取数据。副本之间是一主多从的关系。
Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition, 集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker, 那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。 如果某topic有N个partition,集群中broker数目少于N个,那么 一个broker存储该topic的一个或多个partition。 在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。 如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢, leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
Zookeeper
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。 生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
AR(Assigned Replicas)
分区中所有的副本统称为AR。
ISR(In-Sync Replicas)
所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR。
OSR(Out-of-Sync-Replicas)
与Leader副本同步滞后过多的副本。
HW(High Watermark)
高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。
LEO(Log End Offset)
即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说, 如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0,9]。
# RabbitMQ如何实现高可用?
在分布式架构下,高可用是最基础的设计。也就是说,一旦依赖的某个服务出现故障,不能影响业务的正常执行。RabbitMQ提供了两种集群模式:普通集群模式、镜像集群模式
普通集群
这种集群模式下,各个节点只同步元数据,不同步队列中的消息。其中元数据包含队列的名称、交换机名称及属性、交换机与队列的绑定关系等。 当我们发送消息和消费消息的时候,不管请求发送到RabbitMQ集群的哪个节点。最终都会通过元数据定位到队列所在的节点去存储以及拉取数据。 很显然,这种集群方式并不能保证Queue的高可用,因为一旦Queue所在的节点挂了,那么这个Queue的消息就没办法访问了。它的好处是通过多个节点分担了流量的压力,提升了消息的吞吐能力。
镜像集群
它和普通集群的区别在于,镜像集群中Queue的数据会在RabbitMQ集群的每个节点存储一份。一旦任意一个节点发生故障,其他节点仍然可以继续提供服务。所以这种集群模式实现了真正意义上的高可用。
最后,在镜像集群的模式下,我们可以通过Keepalived+HAProxy来实现RabbitMQ集群的负载均衡。其中:
HAProxy是一个能支持四层和七层的负载均衡器,可以实现对RabbitMQ集群的负载均衡 同时为了避免HAProxy的单点故障,可以再增加Keepalived实现HAProxy的主备,如果HAProxy主节点出现故障那么备份节点就会接管主节点提供服务。
RabbitMQ高可用实现方式有两种
第一种是普通集群模式,在这种模式下,一个Queue的消息只会存在集群的一个节点上,集群里面的其他节点会同步Queue所在节点的元数据, 消息在生产和消费的时候,不管请求发送到集群的哪个节点,最终都会路由到Queue所在节点上去存储和拉取消息。 这种方式并不能保证Queue的高可用性,但是它可以提升RabbitMQ的消息吞吐能力
第二种是镜像集群,也就是集群里面的每个节点都会存储Queue的数据副本。意味着每次生产消息的时候,都需要把消息内容同步给集群中的其他节点。 这种方式能够保证Queue的高可用性,但是集群副本之间的同步会带来性能的损耗。另外,由于每个节点都保存了副本,所以我们还可以通过HAProxy实现负载均衡。
# RabbitMQ的消息如何实现路由?
RabbitMQ是一个基于AMQP协议实现的分布式消息中间件。AMQP的具体工作机制是,生产者把消息发送到RabbitMQ Broker上的Exchange交换机上。 Exchange交换机把收到的消息根据路由规则发给绑定的队列(Queue)。最后再把消息投递给订阅了这个队列的消费者,从而完成消息的异步通讯。
其中,Exchange是一个消息交换机,它里面定义了消息路由的规则,也就是这个消息路由到那个队列。然后Queue表示消息的载体,每个消息可以根据路由规则路由到一个或者多个队列里面。
而关于消息的路由机制,核心的组件是Exchange。它负责接收生产者的消息然后把消息路由到消息队列,而消息的路由规则由ExchangeType和Binding决定。
Binding表示建立Queue和Exchange之间的绑定关系,每一个绑定关系会存在一个BindingKey。通过这种方式相当于在Exchange中建立了一个路由关系表。
生产者发送消息的时候,需要声明一个routingKey(路由键),Exchange拿到routingKey之后,根据RoutingKey和路由表里面的BindingKey进行匹配,而匹配的规则是通过ExchangeType来决定的。
在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic
。
direct
: 完整匹配方式,也就是Routing key和Binding Key完全一致,相当于点对点的发送。fanout
: 广播机制,这种方式不会基于Routing key来匹配,而是把消息广播给绑定到当前Exchange上的所有队列上。topic
: 正则表达式匹配,根据Routing Key使用正则表达式进行匹配,符合匹配规则的Queue都会收到这个消息
# Kafka如何保证消息消费的顺序性?
kafka为什么会存在无序消费
首先,在kafka的架构里面,用到了Partition分区机制来实现消息的物理存储,在同一个topic下面,可以维护多个partition来实现消息的分片。
生产者在发送消息的时候,会根据消息的key进行取模,来决定把当前消息存储到哪个partition里面。并且消息是按照先后顺序有序存储到partition里面的。
在这种情况下,假设有一个topic存在三个partition,而消息正好被路由到三个独立的partition里面。 然后消费端有三个消费者通过balance机制分别指派了对应消费分区。因为消费者是完全独立的网络节点, 所有可能会出现,消息的消费顺序不是按照发送顺序来实现的,从而导致乱序的问题。
kafka如何保证有序消费
针对这个问题,一般的解决办法就是自定义消息分区路由的算法,然后把指定的key都发送到同一个Partition里面。 接着指定一个消费者专门来消费某个分区的数据,这样就能保证消息的顺序消费了。
另外,有些设计方案里面,在消费端会采用异步线程的方式来消费数据来提高消息的处理效率,那这种情况下, 因为每个线程的消息处理效率是不同的,所以即便是采用单个分区的存储和消费也可能会出现无序问题, 针对这个问题的解决办法就是在消费者这边使用一个阻塞队列,把获取到的消息先保存到阻塞队列里面,然后异步线程从阻塞队列里面去获取消息来消费。
# 什么是 ISR,为什么需要引入ISR
首先,发送到Kafka Broker上的消息,最终是以Partition的物理形态来存储到磁盘上的。 而Kafka为了保证Parititon的可靠性,提供了Paritition的副本机制,然后在这些Partition副本集里面。存在Leader Partition和Flollower Partition。
生产者发送过来的消息,会先存到Leader Partition里面,然后再把消息复制到Follower Partition, 这样设计的好处就是一旦Leader Partition所在的节点挂了,可以重新从剩余的Partition副本里面选举出新的Leader。 然后消费者可以继续从新的Leader Partition里面获取未消费的数据。
在Partition多副本设计的方案里面,有两个很关键的需求。
- 副本数据的同步
- 新Leader的选举
这两个需求都需要涉及到网络通信,Kafka为了避免网络通信延迟带来的性能问题,以及尽可能的保证新选举出来的Leader Partition里面的数据是最新的,所以设计了ISR这样一个方案。 ISR全称是 in-sync replica,它是一个集合列表,里面保存的是和Leader Parition节点数据最接近的Follower Partition,如果某个Follower Partition里面的数据落后Leader太多,就会被剔除ISR列表。 简单来说,ISR列表里面的节点,同步的数据一定是最新的,所以后续的Leader选举,只需要从ISR列表里面筛选就行了。
所以,我认为引入ISR这个方案的原因有两个
- 尽可能的保证数据同步的效率,因为同步效率不高的节点都会被踢出ISR列表。
- 避免数据的丢失,因为ISR里面的节点数据是和Leader副本最接近的。
# Kafka怎么避免重复消费
首先Kafka Broker上存储的消息,都有一个Offset标记。然后kafka的消费者是通过offSet标记来维护当前已经消费的数据, 每消费一批数据,Kafka Broker就会更新OffSet的值,避免重复消费。
默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。Kafka消费端的自动提交逻辑有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候提交。 所以在Consumer消费的过程中,应用程序被强制kill掉或者宕机,可能会导致Offset没提交,从而产生重复提交的问题。
除此之外,还有另外一种情况也会出现重复消费。在Kafka里面有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。 Consumer端会从分配的Partition里面去消费消息,如果Consumer在默认的5分钟内没办法处理完这一批消息。就会触发Kafka的Rebalance机制,从而导致Offset自动提交失败。 而在重新Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费的问题。
基于这样的背景下,我认为解决重复消费消息问题的方法有几个。
- 提高消费端的处理性能避免触发Balance,比如可以用异步的方式来处理消息,缩短单个消息消费的市场。或者还可以调整消息处理的超时时间。还可以减少一次性从Broker上拉取数据的条数。
- 可以针对消息生成md5然后保存到mysql或者redis里面,在处理消息之前先去mysql或者redis里面判断是否已经消费过。这个方案其实就是利用幂等性的思想。
# Kafka的零拷贝原理?
在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,如图
那么它必须要经过几个拷贝的过程:
- 从磁盘中读取目标文件内容拷贝到内核缓冲区
- CPU控制器再把内核缓冲区的数据赋值到用户空间的缓冲区中
- 接着在应用程序中,调用write()方法,把用户空间缓冲区中的数据拷贝到内核下的Socket Buffer中。
- 最后,把在内核模式下的SocketBuffer中的数据赋值到网卡缓冲区(NIC Buffer)
- 网卡缓冲区再把数据传输到目标服务器上。
在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历4次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的,分别是:
- 从内核空间赋值到用户空间
- 从用户空间再次复制到内核空间
除此之外,由于用户空间和内核空间的切换会带来CPU的上线文切换,对于CPU性能也会造成性能影响。
而零拷贝,就是把这两次多于的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核中直接传输给Socket,而不需要再经过应用程序所在的用户空间,如下图所示。
零拷贝通过DMA(Direct Memory Access)技术把文件内容复制到内核空间中的Read Buffer,接着把包含数据位置和长度信息的文件描述符加载到Socket Buffer中,DMA引擎直接可以把数据从内核空间中传递给网卡设备。 在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了2次cpu的上下文切换,对于效率有非常大的提高。 所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。
在程序中如何实现零拷贝呢?
- 在Linux中,零拷贝技术依赖于底层的sendfile()方法实现
- 在Java中,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法。
- 除此之外,还有一个 mmap 的文件映射机制,它的原理是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。
# 百万消息堆积,如何快速解决?
解决消息堆积的原因
- 新上线的消费者功能有BUG,消息无法被消费
- 消费者实例宕机或因网络问题暂时无法同Broker建立连接。
- 生产者短时间内推送大量消息至Broker,消费者消费能力不足
解决消息堆积的方案
事前处理
在我们系统上线之前应该我们对大致的流量应该有一个预估,并提前进行压测.
事中处理:
- 临时扩容: 同时增加消费者实例数量、增加消息队列数量、
- 增加临时消费者:如果我们消息队列内存不允许的情况下,我们可以增加一个临时消费者实例(只做插入操作,将消息保存到数据库中,保证消息不丢失)
- 业务降级,最低限度让系统还能正常运转,服务重要业务和核心业务:a.减少发送方发送的数据量 b.减轻消费者业务逻辑,提高消费速度。
事后处理
- 优化我们消费者代码,提高消费者消费速度
- 并行处理,将消息分配到多个处理线程中同时处理,从而提供系统的处理速度和性能.
为了防止我们消息堆积问题的产生
- 我们要做好上线前的对流量的预估和提前的压测
- 对于新增活动或者其他会导致我们短时间流量飙升的,需要提前增加我们的消费者实例数量和消息队列数量
- 通过服务降级来保证重要业务和核心业务。
- 提高我们的消费者能力(策略:丢弃(消费者有API获取的),加强消费速度或者增加并行);
← Mybatis面试问题 nginx面试问题 →