2025-01-17
消息队列
00
请注意,本文编写于 89 天前,最后修改于 86 天前,其中某些信息可能已经过时。

目录

基础概念
消息模型
存储机制
在partition中如何通过offset查找message
Kafka 高性能 IO 实现
使用批量消息提升服务端处理能力
使用顺序读写提升磁盘 IO 性能
利用 PageCache 加速消息读写
ZeroCopy:零拷贝技术
数据压缩
实现分布式事务
消息丢失
检测消息丢失
确保不丢消息
消息重复
用幂等性解决重复消息问题
消息积压
常见问题
如何保证消息的严格顺序
参考

以 Kafka 为例,Rocket MQ 与 Kafka 相似。

基础概念

  • Broker 节点:一个 Broker 就是一个 Kafka 服务端进程。在生产环境下,通常会运行多个 Broker 组成一个 Kafka 集群,并且将每一个 Broker 都部署在不同的机器上,以提高系统的可用性。
  • Partition 分区:为了提高性能,在 Kafka 中,可以将一个 Topic 划分成多个 Partition,类似于分区、分片的概念。每一个 Partition 是一组有序的消息,同一个 Topic 的多个 Partition 可以存储在不同的 Broker 上。Kafka 中的分区编号从 0 开始,生产者生产的每一个消息只能被发送到一个分区中。
  • Topic 主题:在 Kafka 中,发送消息的系统 A 和获取消息的系统 B 在发送/获取消息的时候,都需要指定一个 Topic。Topic 是逻辑上的一个容器,我们可以在 Kafka 中创建若干个 Topic,然后根据业务和消息类型,将消息发送到不同的 Topic 当中。
  • Message 消息:消息系统最重要的主题无疑是消息本身,消息是 Kafka 主要处理的对象。最简单的消息系统处理流程就是:系统 A 将消息发送给 Kafka,系统 B 再从 Kafka 获取消息。
  • Producer 生产者:生产消息
  • Consumer 消费者:消费消息
  • Replication 副本:Replica 是 Kafka 实现高可用另一个特性。每一个 Partition 可以有多个 Replica 副本,提供存储冗余。将每个 Partition 的多个副本保存在不同的 Broker 中,其中一个是 Leader Replica,其余为 Follower Replica。当一个 Broker 宕机后,如果造成了 Leader Replica 不可用,那么会从 Follower 中选举出一个新的 Leader。生产者和消费者与 Partition 交互的时候,都只会与 Leader 交互,Follower 只充当数据副本的角色。当生产者发送一个消息后,只有当所有的 Replica 都收到消息并告知 Leader 之后,这个消息才能被消费者读取到。
  • Offset 消息位移:在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要为每个消费组在每个队列上维护一个消费位置,这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。
  • Consumer Group 消费者组:为了实现高吞吐量,在 Kafka 中,可以让若干个消费者组成一个消费者组,共同消费同一个 Topic 下不同 Partition 的消息,以实现更高的吞吐量,不同 Consumer Group 实现不同的功能,Group A 用于实时分析日志,Group B 用于将数据存储到数据库。
  • Rebalance 重平衡:当 Kafka 的某个主题的消费者组中,有一个消费者不可用后,其他消费者会自动重新分配订阅的主题分区,这个过程叫做 Rebalance,是 Kafka 实现消费者端高可用的重要手段。
  • Coordinator 协调员:Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 Rebalance。

消息模型

早期的消息队列,就是按照“队列”的数据结构来设计的。我们一起看下这个图,Producer 发消息就是入队操作,Consumer 收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。 这就是最初的一种消息模型:队列模型。 image.png 如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。

显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

为了解决这个问题,演化出了另外一种消息模型:“发布 - 订阅模型(Publish-Subscribe Pattern)”

几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,Kafka 在 Topic 下增加了 Partition 的概念。每个 Topic 包含多个 Partition,通过多个 Partition 来实现多实例并行生产和消费。 image.png

存储机制

同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
  • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

image.png

partion中segment file组成和物理结构。

  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

image.png

index file 和 data file 的对应关系

image.png

message 物理结构

image.png

关键字解释说明
8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message sizemessage大小
4 byte CRC32用crc32校验message
1 byte “magic”表示本次发布Kafka服务程序协议版本号
1 byte “attributes”表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length表示key的长度,当key为-1时,K byte key字段不填
K byte key可选
value bytes payload表示实际消息数据。

在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

  • 第一步查找segment file 上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
  • 第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap(内存映射文件)可以直接内存操作,稀疏索引(即不为每一条数据建索引)为数据文件的每个对应 message 设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

Kafka 高性能 IO 实现

使用批量消息提升服务端处理能力

  • Kafka 的 Producer 只提供了单条发送的 send() 方法,并没有提供任何批量发送的接口。但实际上,Kafka 的客户端 SDK 在实现消息发送逻辑的时候,采用了异步批量发送的机制。当你调用 send() 方法发送一条消息之后,无论你是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去。它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。
  • 在服务端,Kafka 不会把一批消息再还原成多条消息,再一条一条地处理,这样太慢了。在 Broker 整个处理流程中,无论是写入磁盘、从磁盘读出来、还是复制到其他副本这些流程中,批消息都不会被解开,一直是作为一条“批消息”来进行处理的。
  • 在消费时,消息同样是以批为单位进行传递的,Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理。

构建批消息和解开批消息分别在发送端和消费端的客户端完成,不仅减轻了 Broker 的压力,最重要的是减少了 Broker 处理请求的次数,提升了总体的处理能力。

使用顺序读写提升磁盘 IO 性能

相比于网络传输和内存,磁盘 IO 的速度是比较慢的。对于消息队列的服务端来说,性能的瓶颈主要在磁盘 IO 这一块。

对于磁盘来说,它有一个特性,就是顺序读写的性能要远远好于随机读写。在 SSD(固态硬盘)上,顺序读写的性能要比随机读写快几倍,如果是机械硬盘,这个差距会达到几十倍。为什么呢?

操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。如果是机械硬盘,这个寻址需要比较长的时间,因为它要移动磁头,这是个机械运动,机械硬盘工作的时候会发出咔咔的声音,就是移动磁头发出的声音。

顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。

Kafka 就是充分利用了磁盘的这个特性。它的存储设计非常简单,对于每个分区,它把从 Producer 收到的消息,顺序地写入对应的 log 文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。

这样一个简单的设计,充分利用了顺序读写这个特性,极大提升了 Kafka 在使用磁盘时的 IO 性能。

利用 PageCache 加速消息读写

PageCache 是现代操作系统都具有的一项基本特性。通俗地说,PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存。无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。

应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况。

一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。

用户的应用程序在使用完某块 PageCache 后,操作系统并不会立刻就清除这个 PageCache,而是尽可能地利用空闲的物理内存保存这些 PageCache,除非系统内存不够用,操作系统才会清理掉一部分 PageCache。清理的策略一般是 LRU 或它的变种算法,这个算法我们不展开讲,它保留 PageCache 的逻辑是:优先保留最近一段时间最常使用的那些 PageCache。

Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。

也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能。

ZeroCopy:零拷贝技术

我们知道,在服务端,处理消费的大致逻辑是这样的:

  • 首先,从文件中找到消息数据,读到内存中;
  • 然后,把消息通过网络发给客户端。

这个过程中,数据实际上做了 2 次或者 3 次复制:

  • 从文件复制数据到 PageCache 中,如果命中 PageCache,这一步可以省掉;
  • 从 PageCache 复制到应用程序的内存空间中,也就是我们可以操作的对象所在的内存;
  • 从应用程序的内存空间复制到 Socket 的缓冲区,这个过程就是我们调用网络应用框架的 API 发送数据的过程。

Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的 2、3 步骤两次复制合并成一次复制。直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

数据压缩

什么情况适合数据压缩?

  • 不压缩直接传输需要的时间是: 传输未压缩数据的耗时。
  • 使用数据压缩需要的时间是: 压缩耗时 + 传输压缩数据耗时 + 解压耗时。

压缩和解压的操作都是计算密集型的操作,非常耗费 CPU 资源。如果你的应用处理业务逻辑就需要耗费大量的 CPU 资源,就不太适合再进行压缩和解压。

又比如说,如果你的系统的瓶颈是磁盘的 IO 性能,CPU 资源又很闲,这种情况就非常适合在把数据写入磁盘前先进行压缩。

Kafka 是否开启压缩,这是可以配置,它也支持配置使用哪一种压缩算法。原因我们在上面说过,不同的业务场景是否需要开启压缩,选择哪种压缩算法是不能一概而论的。所以,Kafka 的设计者把这个选择权交给使用者。

在开启压缩时,Kafka 选择一批消息一起压缩,每一个批消息就是一个压缩分段。使用者也可以通过参数来控制每批消息的大小。在 Kafka 中,生产者生成一个批消息发给服务端,在服务端中是不会拆分批消息的。那按照批来压缩,意味着,在服务端也不用对这批消息进行解压,可以整批直接存储,然后整批发送给消费者。最后,批消息由消费者进行解压。

在服务端不用解压,就不会耗费服务端宝贵的 CPU 资源,同时还能获得压缩后,占用传输带宽小,占用存储空间小的这些好处,这是一个非常聪明的设计。

实现分布式事务

image.png 给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。

  • Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
  • 在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。 image.png

消息丢失

检测消息丢失

如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。

我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。

大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。

首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

确保不丢消息

image.png

  • 生产阶段:在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

    当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

  • 存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

    如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

    消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。

消息重复

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

用幂等性解决重复消息问题

一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

消息积压

原因:

  • 生产变快了/消费变慢了
  • 系统中的某个部分出现了性能问题

预防:

  • 优化消费端处理逻辑,增加处理速度
  • 水平扩容,增加消费端的并发数

    注:在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区数量,确保 Consumer 的实例数和分区数量是相等的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

处理:

  • 查看日志是否有消费失败后重复消费,问题消息加入死信队列

    死信队列(DLQ - Dead-Letter-Queue)是一种特殊类型的消息队列,用于临时存储软件系统由于错误而无法处理的消息。消息队列是支持分布式系统中的异步通信的软件组件。借助它们,您可以在软件服务之间发送任何数量的消息,且不需要消息接收器始终处于可用。死信队列特别用于存储没有目的地或无法由预期接收器处理的错误的消息。

  • 没有错误的话就打印堆栈看是否卡住了(死锁/资源等待)
  • 水平扩容

常见问题

如何保证消息的严格顺序

由于消息队列只能保证 Partition 中的顺序,有以下方法来保证消息的顺序。

  • 方法一:Partition 配置成 1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。
  • 方法二:如果需要保证局部严格顺序,可以这样来实现。在发送端,我们使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同 Key 的消息总是发送到同一个队列上,这样可以保证相同 Key 的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。

参考

如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:42tr

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!