..

Kafka: Kafka Internals

1. 集群成员关系

Kafka 使用 Zookeeper 维护集群成员信息

  • Broker 启动时,通过创建临时节点将自己的唯一 ID 注册到 Zookeeper

    每个 Broker 都有唯一标识符号,可以在配置文件中指定,也可以自动生成。 注册路径: /brokers/ids

  • 不同的 Kafka 组件通过订阅注册路径,可以在 Broker 加入或者退出集群时获得通知
  • 在 Broker 宕机或者网络分区时,与 Zookeeper 断开连接,那么启动时创建的临时节点就会被自动删除。同时,监听 Broker 列表的 Kafka 组件就会收到 Broker 被移除的通知。

    虽然 Broker 注册的临时节点会被清除,但是其 ID 会继续保存在其他数据结构中(如主题的副本列表中)。在 Broker 关闭之后,如果使用相同的 ID 启动一个新的 Broker,它会立即加入集群,并拥有与旧 Broker 相同的分区与主题

2. 控制器

控制器也是一个 Broker,除了具有一般 Broker 的功能,还负责分区 Leader 的选举。

Kafka 使用 Zookeeper 的临时节点来选举控制器

  1. 集群初始化选举
    • Broker 集群中第一个启动的 Broker 通过在 Zookeeper 上创建临时节点(/controller)使自己成为控制器
    • 其他 Broker 在启动时,也尝试创建这个临时节点,但是会收到“节点已存在”异常,从而意识到集群控制器节点已经存在(集群已经存在控制器)
    • 其他 Broker 在控制器节点上创建 watch 对象(用于接收节点变更通知)
  2. 控制器异常再次触发选举
    • 如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 临时节点(/controller)消失;其他 Broker 通过 watch 对象收到节点消失的通知,因此尝试让自己成为新的控制器
    • 同样,第一个在 Zookeeper 上成功创建控制器节点的 Broker 成为新的控制器,其他 Broker 在新的控制器节点上再次创建 watch 对象
    • 每个新选出的控制器通过 Zookeeper 的递增操作获得一个全新的,数值更大的 Controller Epoch. 其他 Broker 在知道当前的 Controller Epoch 之后,会忽略旧控制器发出的包含旧 Epoch 的消息

      控制器通过 epoch 来避免“脑裂”(集群中同时存在两个控制器)

控制器负责分区 Leader 管理:

  1. Broker 离开集群
    • 控制器通过 watch Broker 在 Zookeeper 上注册的路径,可以在 Broker 离开集群时收到通知;需要对失去 Leader 的分区进行 Leader 选举
    • 控制器遍历分区,选择分区副本列表中的下一个副本作为新的分区 Leader
    • 控制器向包含新 Leader 或者现有 Follower 的 Broker 发送消息:包含分区 Leader 与 Follower 信息
    • 新 Leader 开始处理生产者与消费者的请求,Follower 从 Leader 处开始复制消息
  2. Broker 加入集群
    • 当新的 Broker 加入集群时,控制器会监测其 Broker ID 是否包含现有的分区副本
    • 如果包含,则将变更通知发送给新加入的 Broker 及其他 Broker,之后新加入的 Broker 上的分区副本就从 Leader 副本开始复制消息

3. 复制

多副本机制可以使得 Kafka 在个别节点失效时,仍能保证可用性与持久性

Kafka 使用主题来组织数据,每个主题包含多个分区,每个分区包含多个副本。副本被分配在 Broker 上,每个 Broker 可以保存成百上千个副本。

有两种类型的副本:

  1. Leader 副本

    每个分区都有一个 Leader 副本。

    为了保证数据一致性,所有生产者与消费者的请求都会由 Leader 副本处理。

  2. Follower 副本

    Leader 副本外的其他副本都是 Follower 副本。

    Follower 副本不会处理来自客户端的请求,唯一的任务就是从 Leader 副本处复制消息,与 Leader 副本保持一致,以便在 Leader 副本不可用时从中选举出新的 Leader。

Leader 副本还需要判断哪些 Follower 副本的状态与自己是保持一致的。

  • Follower 副本为了与 Leader 保持一致,向 Leader 发送获取数据的请求(与消费者获取数据请求方式一样);Leader 返回消息及对应偏移量
  • Leader 副本通过检查每个 Follower 请求的最新偏移量,可以判断 Follower 的复制进度
    • 如果 Follower 在 10s 内没有请求任何消息,或者虽然在请求消息,但是 10s 内没有请求最新数据,那么该 Follower 会被认为是不同步的(out of sync)
    • 如果 Follower 持续请求最新消息,则被认为是同步的(in-sync)

在 Leader 副本失效后,只有同步的副本才可能被选为新的 Leader

Follower 副本正常不活跃的时间或者在被认为不同步的时间通过参数 replica.lag.time.max.ms 控制

首选 Leader:创建主题时选定的 Leader 就是分区的首选 Leader。

分区副本列表中第一个副本一般就是首选 Leader

4. 处理请求

Broker 的大部分工作就是处理客户端,分区副本和控制器发送给分区 Leader 的请求。

Broker 按照请求到达的顺序进行处理

  • 每个监听端口上运行一个 Acceptor 线程,该线程会为请求创建一个连接,并交给 Processor 线程处理
  • Processor 线程从客户端获取请求消息,把它们放到请求队列,然后从响应队列获取响应消息,发送给客户端
  • 请求消息被放入请求队列后,会由 IO 线程负责处理

所有请求都有如下标准头:

  • Request type
  • Request version:Broker 可以处理不同版本客户端的请求
  • Correlation ID:请求消息的唯一标识(响应中也会携带)
  • Client ID:客户端标识

4.1 元数据请求

元数据请求包含了客户端感兴趣的主题列表,可以请求任意 Broker。Broker 的响应中包含了请求主题包含的分区,每个分区的副本信息,以及副本 Leader 信息。

客户端一般会把这些元数据信息缓存本地,并定时刷新,之后直接往目标 Broker 上发送生产请求或者获取请求。

生产请求及获取请求必须发送给分区的 Leader 副本

4.2 生产请求

生产请求必须要发送给分区的 Leader 副本

  • Leader 副本的位置信息由元数据请求返回
  • 如果请求的 Broker 不包含对应分区的 Leader 副本,那么就会响应给客户端“非分区 Leader”的异常

包含 Leader 副本的 Broker 在收到生产请求时,会对请求做一些验证:

  • 是否有权限写入主题
  • 请求中的 acks 值是否有效(0,1 or al)
  • 如果 acks = all,是否有足够多的同步副本保证消息能够被安全写入

    如果同步(in-sync)副本数量不足,Broker 可以拒绝新消息

之后,Broker 会将消息写入本地磁盘。

  • 在 Linux 系统上,消息会被写入到文件系统缓存中,并不保证何时会被刷新到磁盘上
  • Kafka 不会一直等待数据写到磁盘上,反而通过数据复制功能保证消息的持久性

消息在被写入分区的 Leader 副本之后,Broker 再次检查 acks 配置参数

  • 如果 acks = 0/1,那么 Broker 立即返回响应
  • 如果 acks = all,那么请求会被保存到一个被称为 purgatory 的缓冲区中,直到所有的 Follower 副本都复制了消息,Leader 才会响应客户端

从生产者角度看,acks = 0 那么生产者在将消息发送出去之后,完全不需要等待 Broker 的响应;acks = 1 生产者会等待 Leader 副本写入成功;acks = all 生产者会等待所有同步副本都写入成功

4.2 获取请求

获取请求必须要发送给分区的 Leader 副本

Leader 副本在收到获取请求后,会检查请求是否有效:

  • 偏移量在指定分区上是否存在
    • 如果偏移量不存在(可能是数据已经被删除),那么将会返回给客户端一个错误
    • 如果偏移量存在,则 Broker 按照指定的数量上限读取消息,并返回给客户端

客户端在发送获取请求时,除了需要指定获取的分区及偏移量,还需要指定 Broker 最多从一个分区内可以返回多少数据量,以防止返回过多数据导致客户端内存耗尽。同时,也可以指定返回数据的下限,可以在 Broker 数据量较少的情况下减少回传次数,降低开销。

Kafka 比较知名的是,使用零拷贝(Zero-Copy)技术向客户端发送消息

  • Kafka 直接把消息从文件(更确切的说是 Linux 文件系统缓存)发送到网络通道,不需要经过任何中间缓冲区。相比于其他数据库系统,在发送给客户端之前,需要先将数据缓存在本地
  • 这项技术避免了字节复制(降低 CPU 消耗),也不需要管理内存缓存,提高系统性能

客户端在请求 Leader 副本获取消息时,只能获取已经被写入所有同步(in-sync)副本的消息

  • 也就是说,Leader 副本上的数据并不是都可以被消费
  • 除了消费者,Follower 副本同样也只能获取已经被写入所有同步副本的消息

这么做的目的是为了确保数据一致性

  • 如果消息还未被同步,那么当 Leader 崩溃后,新的 Leader 可能会丢失数据。从消费者的角度来说,前后消费的数据可能不一致。

消费者只能看到高水位之上的消息;参考 In-Sync Replicas 机制(副本同步机制)

相应的,如果 Broker 之间复制消息较慢,也会导致消费者消费消息的延迟增大。

replica.lag.time.max.ms 参数可以配置消息复制的最大延迟

4.3 其他请求

  • Broker 之间请求,如控制器发送给 Leader 副本及 Follower 副本的请求
  • Offset 请求,如客户端提交 Offset,拉取 Offset 请求

5. 物理存储

分区是 Kafka 基本存储单元。一个分区无法在多个 Broker 间再拆分,也无法在同一个 Broker 上的多个磁盘间再细分。

在配置 Kafka 的时候,需要指定用于存储分区的目录清单:log.dirs

5.1 分区分配

在进行分区分配时,期望达到以下目标(用以提高容灾能力):

  • Broker 间均匀分配分区副本(Follower + Leader)
  • 每个分区的副本分配在不同的 Broker 上
    • 如果为 Broker 指定了机架信息,那么每个分区副本尽量分配在不同机架的 Broker 上

假设需要在 m 个 Broker 上分配 n 个分区,且分区复制系数为 k。

  1. 轮询将分区的 Leader 副本分配在 Broker 上
  2. 轮询将分区的 Follower 副本分配在 Broker 上

在轮询分配时,需要尽量保证上述目标

在将分区副本分配到 Broker 上之后,需要为分区副本分配存储目录,分配规则为:

  • 比较每个目录下的分区数量,新分区副本会被分配到分区数量最小的目录下面

如果新添加一个磁盘,那么新的分区都会分配到该磁盘下

5.2 文件管理

Kafka 不会一直保留数据,也不会等到所有的消费者都消费之后再删除数据。通过配置主题数据保留的期限与数据量大小,来进行管理:满足配置的期限或者大小,就可以清除相应数据。

如果一个分区的数据保留在一个文件中,那么这个文件可能会变得特别大,在进行数据查找与删除时会比较耗时。

因此,Kafka 将分区划分成多个片段(Segment)

  • 默认片段大小为 1GB 或 一周的数据
  • Broker 在往分区写入数据时,如果当前片段达到上限,则关闭当前文件,并打开新的文件

当前正在写入数据的片段被称为活跃片段(Active Segment);活跃片段永远不会被删除。

Broker 会为每个片段打开一个句柄(包括已经关闭的片段),如果片段过多的话可能会导致文件句柄过多

5.3 文件格式

Kafka 将消息的键,值,偏移量,消息大小,校验和,消息格式版本号,压缩算法,时间戳保存在磁盘文件。

保存在磁盘上的数据格式与从生产者发送过来,或者发送给消费者的消息格式是一样的,不需要对消息进行再次压缩,解压操作。因此,Kafka 可以使用零复制(Zero-Copy)技术将消息发送给消费者(磁盘存储与网络传输格式均相同)

如果生产者发送的是压缩消息,那么同一个批次的消息会被压缩,被当成“包装消息(Wrapper Message)”发送。消费者对包装消息解压,可以看到整个批次的消息,每个消息都有自己的偏移量及时间戳。

  • 消息 Offset 生成时机

    Kafka 消息的 offset 是在消息被写入 Kafka 分区时生成的。具体来说,当 Producer 发送消息到 Kafka 时,Kafka 会为每条消息分配一个唯一的 offset,并将消息写入对应的分区中。每个分区的消息都有一个单独的连续的 offset 序列。

    消息的 offset 是由 Kafka 服务器自动分配的,并且在消息被成功写入分区后才会生成。因此,offset 的生成时机可以分为两个阶段:

    1. 生产者发送消息:当生产者发送消息到 Kafka Broker 时,Broker 会为消息分配一个 offset,并将消息写入对应的分区。这是 offset 的生成的第一个阶段,也是 offset 的初次分配。
    2. 消息成功写入分区:当消息成功写入分区后,offset 的生成完成。此时,消费者可以通过指定对应分区的 offset 来消费该消息。

    需要注意的是,offset 是按照分区粒度生成的,每个分区都有独立的 offset 序列。不同分区的消息具有不同的 offset 值,并且每个分区的 offset 是连续递增的,从而保证了消息在分区内的顺序性。

    消费者在消费消息时,可以通过指定分区和 offset 来定位和消费特定的消息。消费者可以跟踪已消费的消息的 offset,并定期提交这些 offset,以便在 Consumer 重启或故障恢复时能够从之前的消费位置继续消费。

5.4 索引

为了更快定位到消费者请求的偏移量,Kafka 为每个分区维护了一个索引。

索引把偏移量映射到片段(Segment)文件及偏移量在文件中的位置

如果索引出现损坏,Kafka 会通过重新读取消息并录制偏移量及位置来生成新的索引。

5.5 压缩

通常,Kafka 会根据设置的时间保留数据,把过期的数据清理掉。

但是,有些场景需要保留最新的数据,旧的数据可以被清理。

  • 一个应用程序使用 Kafka 保存自己的状态,每次状态变化时,都将最新状态写入 Kafka。如果程序崩溃,可以从 Kafka 中读取最新消息来恢复。此时,应用程序只关心崩溃前的最新状态,而运行过程中的状态并不关心。

Kafka 通过配置主题的保留策略来满足上述场景:

  • Delete早于保留时间的事件会被删除
  • Compact只保留每个 Key 的最新值

这种压缩清理的策略需要消息中既包含 key,也包含 value,才可以使用

压缩清理操作只针对已经关闭的片段

5.6 压缩原理

每个日志片段可以分为两部分:

  1. 干净的(Clean)

    这部分消息之前被清理过,每个键只保留之前清理过程中的最新值。

  2. 污浊的(Dirty)

    这部分消息是之前清理之后写入的

如果启用压缩清理功能,Broker 会启动多个清理线程执行清理任务。清理线程会选择污浊率高的分区进行清理(污浊消息占分区总消息的大小)。

清理线程创建一个 Map,并从片段的干净部份开始遍历,比对消息。清理前后的分区片段数据如下:

5.7 删除消息

为了将消息从 Kafka 中删除,生产者需要发送一个包含该 Key,且 Value 为 NULL 的消息

  • 清理线程发现该消息时,先进行常规清理,只保留 Value 为 NULL 的消息;该消息被称为墓碑消息
  • 墓碑消息会被保留一段时间(时间可配置);在这期间,消费者仍可消费该消息
  • 墓碑消息过期之后,清理线程会将其清除