以 Kafka 为例,Rocket MQ 与 Kafka 相似。
早期的消息队列,就是按照“队列”的数据结构来设计的。我们一起看下这个图,Producer 发消息就是入队操作,Consumer 收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。
这就是最初的一种消息模型:队列模型。
如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。
如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。
显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。
为了解决这个问题,演化出了另外一种消息模型:“发布 - 订阅模型(Publish-Subscribe Pattern)”。
几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,Kafka 在 Topic 下增加了 Partition 的概念。每个 Topic 包含多个 Partition,通过多个 Partition 来实现多实例并行生产和消费。
同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
partion中segment file组成和物理结构。
index file 和 data file 的对应关系
message 物理结构
关键字 | 解释说明 |
---|---|
8 byte offset | 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校验message |
1 byte “magic” | 表示本次发布Kafka服务程序协议版本号 |
1 byte “attributes” | 表示为独立版本、或标识压缩类型、或编码类型。 |
4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填 |
K byte key | 可选 |
value bytes payload | 表示实际消息数据。 |
例如读取offset=368776的message,需要通过下面2个步骤查找。
这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap(内存映射文件)可以直接内存操作,稀疏索引(即不为每一条数据建索引)为数据文件的每个对应 message 设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
构建批消息和解开批消息分别在发送端和消费端的客户端完成,不仅减轻了 Broker 的压力,最重要的是减少了 Broker 处理请求的次数,提升了总体的处理能力。
相比于网络传输和内存,磁盘 IO 的速度是比较慢的。对于消息队列的服务端来说,性能的瓶颈主要在磁盘 IO 这一块。
对于磁盘来说,它有一个特性,就是顺序读写的性能要远远好于随机读写。在 SSD(固态硬盘)上,顺序读写的性能要比随机读写快几倍,如果是机械硬盘,这个差距会达到几十倍。为什么呢?
操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。如果是机械硬盘,这个寻址需要比较长的时间,因为它要移动磁头,这是个机械运动,机械硬盘工作的时候会发出咔咔的声音,就是移动磁头发出的声音。
顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。
Kafka 就是充分利用了磁盘的这个特性。它的存储设计非常简单,对于每个分区,它把从 Producer 收到的消息,顺序地写入对应的 log 文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。
这样一个简单的设计,充分利用了顺序读写这个特性,极大提升了 Kafka 在使用磁盘时的 IO 性能。
PageCache 是现代操作系统都具有的一项基本特性。通俗地说,PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存。无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况。
一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
用户的应用程序在使用完某块 PageCache 后,操作系统并不会立刻就清除这个 PageCache,而是尽可能地利用空闲的物理内存保存这些 PageCache,除非系统内存不够用,操作系统才会清理掉一部分 PageCache。清理的策略一般是 LRU 或它的变种算法,这个算法我们不展开讲,它保留 PageCache 的逻辑是:优先保留最近一段时间最常使用的那些 PageCache。
Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。
也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能。
我们知道,在服务端,处理消费的大致逻辑是这样的:
这个过程中,数据实际上做了 2 次或者 3 次复制:
Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的 2、3 步骤两次复制合并成一次复制。直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。
什么情况适合数据压缩?
压缩和解压的操作都是计算密集型的操作,非常耗费 CPU 资源。如果你的应用处理业务逻辑就需要耗费大量的 CPU 资源,就不太适合再进行压缩和解压。
又比如说,如果你的系统的瓶颈是磁盘的 IO 性能,CPU 资源又很闲,这种情况就非常适合在把数据写入磁盘前先进行压缩。
Kafka 是否开启压缩,这是可以配置,它也支持配置使用哪一种压缩算法。原因我们在上面说过,不同的业务场景是否需要开启压缩,选择哪种压缩算法是不能一概而论的。所以,Kafka 的设计者把这个选择权交给使用者。
在开启压缩时,Kafka 选择一批消息一起压缩,每一个批消息就是一个压缩分段。使用者也可以通过参数来控制每批消息的大小。在 Kafka 中,生产者生成一个批消息发给服务端,在服务端中是不会拆分批消息的。那按照批来压缩,意味着,在服务端也不用对这批消息进行解压,可以整批直接存储,然后整批发送给消费者。最后,批消息由消费者进行解压。
在服务端不用解压,就不会耗费服务端宝贵的 CPU 资源,同时还能获得压缩后,占用传输带宽小,占用存储空间小的这些好处,这是一个非常聪明的设计。
给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。
首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。
生产阶段:在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。
消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。
原因:
预防:
注:在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区数量,确保 Consumer 的实例数和分区数量是相等的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
处理:
死信队列(DLQ - Dead-Letter-Queue)是一种特殊类型的消息队列,用于临时存储软件系统由于错误而无法处理的消息。消息队列是支持分布式系统中的异步通信的软件组件。借助它们,您可以在软件服务之间发送任何数量的消息,且不需要消息接收器始终处于可用。死信队列特别用于存储没有目的地或无法由预期接收器处理的错误的消息。
由于消息队列只能保证 Partition 中的顺序,有以下方法来保证消息的顺序。
本文作者:42tr
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!