消息中间件

一、消息中间件

1. 什么是消息中间件?

消息中间件是基于队列消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,主要解决异步处理、应用耦合、流量消峰等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

2. 消息中间件解决的问题

2.1. 异步处理

消息队列提供了异步处理机制,因为很多时候用户并不需要立即响应来处理消息,那么通过这个机制就可以把所有消息放入 MQ 中。例如:某系统发来的数据中包含很多图片信息,如果对其中的信息都进行保存处理,用户一番操作下来可能会很久。采用异步处理之后,系统会将所有数据存放在 MQ 中,用户不需要立即处理,大大缩短了系统的响应时间。

image-20230310184602588

2.2. 应用解耦

消息队列可以对系统间的依赖进行解耦,降低依赖系统变更带来的影响。例如:用户在下单后,订单系统A需要通知系统B系统C等做出响应的处理,此时的系统A是强依赖系统B系统C的,一旦系统B出现故障或者需要重新加入高耦合的系统D时就必须要更改系统A的代码,如果经常出现这种依赖系统迭代的情况,那么系统A就会很难维护,可以通过加入消息队列对依赖系统进行解耦,这样系统A也无需关心其他系统的可用性。

image-20230314162309878

2.3. 流量削峰

流量削峰还有个形象的名字叫做削峰填谷,其实就是指当数据量激增时,能够有效地隔离上下游业务,将上游突增的流量缓存起来,真正地填到谷中,以平滑的方式传到下游系统,避免了流量的不规则冲击。例如:有个活动页面平时也就 50qps,某一特殊时刻访问量突然增多,能达到 1000qps,但是当前系统的处理能力最多为 100qps,这个时候可以通过消息队列来进行削峰填谷,如下图所示。

image-20230314163618464

2.4. 其他

还能解决消息通讯远程调用等问题。消息队列内置了高效的通信机制,可用于消息通讯。如实现点对点消息队列、聊天室等

3. 消息中间件的传输模式

3.1. 点对点模式

系统A发送的消息只能被系统B接收,其他的任何系统都不能获取到系统A发送的消息。

image-20230314171133511

3.2. 发布/订阅模式

与点对点模型的区别在于发布/订阅模型多了一个 topic 的概念,可以存在多个发布者向相同主题发送消息,而订阅者也可以存在多个,接收相同主题的消息。在日常生活中就像不同主题的报纸期刊,同时也有不同群体的读者来订阅

image-20230314171444964

二、主流消息中间件:

1. RabbitMQ

2. Kafka

Kafka架构,更多详情请参考:爱笑的架构师

Kafka的整体架构非常简单,是显式分布式架构,主要由producer、broker(kafka)和consumer组成。

图片Kafka架构(精简版)

Producer(生产者)可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(如记录中的key)来完成。

Consumer(消费者)使用一个consumer group(消费组)名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。


生产者发送消息的一般流程:

  1. 生产者是与leader直接交互,所以先从集群获取topic对应分区的leader元数据;
  2. 获取到leader分区元数据后直接将消息发给过去;
  3. Kafka Broker对应的leader分区收到消息后写入文件持久化;
  4. Follower拉取Leader消息与Leader的数据保持一致;
  5. Follower消息拉取完毕需要给Leader回复ACK确认消息;
  6. Kafka Leader和Follower分区同步完,Leader分区会给生产者回复ACK确认消息。

图片

生产者采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘。消息写入Leader后,Follower是主动与Leader进行同步。

Kafka消息发送有两种方式:同步(sync)异步(async),默认是同步方式,可通过producer.type属性进行配置。通过配置producer.acks属性为all来保证leader和follower不丢,但是如果网络拥塞,没有收到ACK,会有重复发的问题。默认值为1,所以默认的producer级别是at least once,并不能exactly once


Kafka Broker 接收到数据后会将数据进行持久化存储,你以为是下面这样的

图片

实际上:

图片

操作系统本身有一层缓存,叫做Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中,至于什么时候将缓存的数据写入文件中是由操作系统自行决定。

Kafka提供了一个参数 producer.type 来控制是不是主动flush:

  1. 同步 (sync):Kafka写入到mmap之后就立即 flush 然后再返回 Producer
  2. 异步 (async):写入mmap之后立即返回 Producer 不调用 flush

Kafka通过多分区多副本机制中已经能最大限度保证数据不会丢失,但如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了,当然这种情况很极端


消费者丢失消息

消费者通过pull模式主动的去 kafka 集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader分区去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id。同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会出现多个消费者消费同一分区的数据。

图片

消费者群组消费消息

消费者消费的进度通过offset保存在kafka集群的__consumer_offsets这个topic中。

消费消息的时候主要分为两个阶段:

1、标识消息已被消费,commit offset坐标

2、处理消息。

在生产环境中严格做到exactly once其实是难的,同时也会牺牲效率吞吐量,所以最佳实践是业务侧做好补偿机制,万一出现消息丢失可以兜底,消费端可能还是会丢消息的两种场景:

  1. 先commit再处理消息。如果在处理消息的时候异常了,但是offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。

  2. 先处理消息再commit。如果在commit之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。

三、主流消息中间件对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比RocketMQ、Kafka低一个数量级 同ActiveMQ 10万级,支持高吞吐 10万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响 topic可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka尽量保证topic数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源
时效性 ms级 微妙级,这是RabbitMQ的一大特点,延迟最低 ms级 延迟在ms级以内
可用性 高、基于主从架构实现的高可用 同ActiveMQ 非常高,分布式架构 非常高,分布式架构。一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到0丢失 同RocketMQ
功能支持 MQ领域的功能极其完备 基于erlang开发,并发能力很强,性能极好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
社区活跃度

四、如何保证消息不丢失:

消息传递语义也就是message delivery semantic ,简单说就是消息传递过程中消息传递的保证性。主要分为以下三种:

  • at most once:最多一次。消息可能丢失也可能被处理,但最多只会被处理一次。
  • at least once:至少一次。消息不会丢失,但可能被处理多次。可能重复,不会丢失。
  • exactly once:精确传递一次。消息被处理且只会被处理一次。不丢失不重复就一次。

理想情况下肯定是希望系统的消息传递是严格exactly once,也就是保证不丢失、只会被处理一次,但是很难做到。

一个消息从生产者产生,到被消费者消费,主要经过三个过程:

image-20230404185653996

  1. 生产者产生消息
  2. 消息发送到存储端,保存下来
  3. 消息推送到消费者,消费者消费完,ack应答。

因此如何保证MQ不丢失消息,可以从这三个阶段阐述:

  • 生产者保证不丢消息
  • 存储端不丢消息
  • 消费者不丢消息

1. 生产者保证不丢消息

1.1. RabbitMQ

方案1:RabbitMQ可以选择⽤ RabbitMQ 提供的事务功能,就是⽣产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么⽣产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,缺点是一旦用上事务机制(同步)基本上吞吐量会下来,因为太耗性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
//2.连接工厂创建连接
Connection connection = factory.newConnection();
//3.创建信道
Channel channel = connection.createChannel();
//4.开启事务
channel.txSelect
try {
// 5.这⾥发送消息
// 6.提交事务
channel.txCommit
} catch (Exception e)
{
//回滚
channel.txRollback
// 这⾥再次重发这条消息
}

方案2:开启 confirm模式,你每次写的消息都会分配⼀个唯⼀的 id,然后如果写⼊了 RabbitMQ 中,RabbitMQ 会给你回传⼀个 ack 消息,告诉你说这个消息 ok 了。 如果RabbitMQ 没能处理这个消息,会回调你的⼀个 handleNack接⼝,告诉你这个消息接收失败,你可以重试。 ⽽且你可以结合这个机制⾃⼰在内存⾥维护每个消息 id 的状态,如果超过⼀定时间还没接收到这个消息的回调,那么你可以重发

confirm模式也分同步和异步,同步缺点:使用同步的方式需要等待所有的消息发送成功以后才会执行后面的代码,只要有一个消息未被确认就会抛出IO异常。
  • confirm
    • 同步confirm
      • 普通confirm模式:每发送一条消息后调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm。
      • 批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务端confirm。
    • 异步confirm

以下列举异步confirm实现

  • 第一步,在channel上开启确认模式:channel.confirmSelect()

  • 第二步,在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//生产者偶模型
public class ConfirmProducer {

public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();

//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
//消息失败处理
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//deliveryTag;唯一消息标签
//multiple:是否批量
System.err.println("-------no ack!-----------");
}
//消息成功处理
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//消费者模型
public class ConfirmConsumer {

public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";

//4 声明交换机和队列,然后进行绑定设置路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

//6 接收消息
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}

1.2. Kafka

方案:Kafka可以通过配置spring.producer.apks属性来确认消息的生产,acks 的默认值为1,所以默认的producer级别是at least once,并不能exactly once(精确传递一次。消息被处理且只会被处理一次。不丢失不重复就一次)。

spring.producer.apks属性有以下三种类型:

  • 0:表示不进行消息接收是否成功的确认;不能保证消息是否发送成功,生成环境基本不会用。(如果发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了)
  • 1:表示当Leader接收成功时确认;只要Leader存活就可以保证不丢失,保证了吞吐量。(可以保证leader不丢,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了)
  • -1或者all:表示Leader和Follower都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。(可以保证leader和follower不丢,但是如果网络拥塞,没有收到ACK,会有重复发的问题。)

2. 存储端不丢消息

2.1. RabbitMQ

方案:可以开启 RabbitMQ 的持久化,消息写⼊之后会持久化到磁盘,哪怕是 RabbitMQ ⾃⼰挂了,恢复之后会⾃动读取之前存储的数据,⼀般数据不会丢。除⾮极其罕⻅的是,RabbitMQ 还没持久化,⾃⼰就挂了,可能导致少量数据丢失,但是这个概率较⼩。

开启持久化必须同时设置的两个步骤:

  1. 创建 queue 的时候将其设置为持久化(这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue ⾥的数据的)
  2. 发送消息的时候将消息的deliveryMode设置为 2,持久化可以跟⽣产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知⽣产者 ack (这样RabbitMQ 就会将消息持久化到磁盘上去,哪怕是RabbitMQ挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue ⾥的数据;并且在极端环境下,哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,⽣产者收不到 ack,你也是可以⾃⼰重发的)

2.2. Kafka

Kafka可以通过多分区多副本机制中已经能最大限度保证数据不会丢失,如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了,当然这种情况很极端

3. 消费阶段不丢消息

3.1. RabbitMQ

方案:可以关闭 RabbitMQ 的⾃动ack,并且通过⼀个 api 来调⽤就⾏,然后每次在自身代码⾥确保处理完的时候,再在程序⾥ack⼀把。 这样的话,如果你没处理完,就不会有ack 了,那 RabbitMQ 就会认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

3.2. Kafka

方案:唯⼀可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边⾃动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你⾃⼰就挂了,此时这条消息就丢咯;同RabbitMQ一样Kafka 会⾃动提交 offset,那么只要关闭⾃动提交 offffset,在处理完之后⾃⼰⼿动提交offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,⽐如你刚处理完,还没提交 offffset,结果⾃⼰挂了,此时肯定会重复消费⼀次,⾃⼰保证幂等性就好了

⼀般是要求起码设置如下 4 个参数:

  1. topi 设置 replication.factor 参数,并且这个值必须⼤于 1,要求每个 partition 必须有⾄少 2 个副本
  2. 在 Kafka 服务端设置 min.insync.replicas参数,并且这个值必须⼤于 1(这个是要求⼀个 leader ⾄少感知到有⾄少⼀个 follower 还跟⾃⼰保持联系,没掉队,这样才能确保 leader 挂了还有⼀个 follower )
  3. 在 Producer 端设置acks=all(这个是要求每条数据,必须是写⼊所有 replica 之后,才能认为是写成功了)
  4. 在 Producer 端设置 retries=MAX( 这个是要求⼀旦写⼊失败,就会⽆限重试)

4. 总结

五、如何保证消息顺序?

1. RabbitMQ

顺序错乱场景::⼀个 queue,多个 consumer。⽐如,⽣产者向 RabbitMQ ⾥发送了三条数据,顺序依次是 Data1、Data2、Data3,压⼊的是 RabbitMQ 的⼀个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的⼀条,结果消费者2先执⾏完操作,把 Data2存⼊数据库,然后是 Data1、data3。

image-20230407172656020

解决方案:

  1. 拆分多个 queue,每个 queue⼀个 consumer,就是多⼀些 queue (比较⿇烦)
  2. ⼀个 queue 但是对应⼀个 consumer,然后这个 consumer 内部⽤内存队列做排队,然后分发给底层不同的 worker 来处理。(推荐)

image-20230407175312031

2. Kafka

顺序错乱场景::⽐如说我们建了⼀个topic,有三个 partition。⽣产者在写的时候,其实可以指定⼀个 key,⽐如说我们指定了某个订单id 作为 key,那么这个订单相关的数据,⼀定会被分发到同⼀个 partition 中去,⽽且这个 partition 中的数据⼀定是有顺序的。消费者从 partition 中取出来数据的时候,也⼀定是有顺序的。到这⾥,顺序还是 ok 的,没有错乱。接着,我们在消费者⾥可能会搞多个线程来并发处理消息多。因为如果消费者是单线程消费处理的话,当处理⽐较耗时,⽐如处理⼀条消息耗时⼏⼗ ms,那么1秒钟只能处理⼏⼗条消息,这吞吐量太低了。⽽多个线程并发跑的话,顺序可能就乱掉了

image-20230407181438109

解决方案:

  1. ⼀个 topic,⼀个 partition,⼀个 consumer,内部单线程消费,单线程吞吐量太低(不推荐)。

  2. 写 N 个内存 queue,具有相同 key 的数据都到同⼀个内存 queue;然后对于 N 个线程,每个线程分别消费⼀个内存 queue 即可,这样就能保证顺序性。

image-20230411151801723

六、如何处理消息积压?

场景:

  1. 消费端出了问题,比如消费者都挂了,没有消费者来消费了,导致消息在队列里面不断积压。
  2. 消费端出了问题,比如消费者消费的速度太慢了,导致消息不断积压。

例子:比如线上正在做订单活动,下单全部走消息队列,如果消息不断积压,订单都没有下单成功,那么将会损失很多交易

解决方案:解铃还须系铃人

  1. 修复代码层面消费者的问题,确保后续消费速度恢复或尽可能加快消费的速度。
  2. 停掉现有的消费者。
  3. 临时建立好原先 5 倍的 Queue 数量。
  4. 临时建立好原先 5 倍数量的 消费者。
  5. 将堆积的消息全部转入临时的 Queue,消费者来消费这些 Queue。

image-20230411155020363

七、如何处理消息过期失效?

例子:RabbitMQ 可以设置过期时间,如果消息超过一定的时间还没有被消费,则会被 RabbitMQ 给清理掉。消息就丢失了。

解决方案:

  • 准备好批量重导的程序
  • 手动将消息闲时批量重导