跳过正文
Spring for Apache Kafka 4:迁移、Share Group 与新 Consumer 协议
  1. 文章/

Spring for Apache Kafka 4:迁移、Share Group 与新 Consumer 协议

·5626 字·12 分钟
NeatGuyCoding
作者
NeatGuyCoding
目录

Spring for Apache Kafka 4:迁移、Share Group 与新 Consumer 协议
#

Spring for Apache Kafka(下文简称 Spring Kafka)4.0 随 Spring Boot 4.0 发布,捆绑 Kafka client 4.1Jackson 3,并在客户端侧对接 KIP-932(Queues / Share Group)与 KIP-848group.protocol=consumer)。本文按工程决策组织:先谈可复现的 3→4 迁移,再谈经典 Consumer Group 上仍适用的错误处理,最后分述 Share Group 的手工装配边界,以及新一代 rebalance 的 opt-in 升级。

版本提示:Share consumer 在 Spring Kafka 4.0 文档 中标注为 preview / early accessShareAckMode 枚举与 renew() 出现在 4.1 文档。下文在 4.0.x 与 4.1 有差异处会单独标明。

若你维护的是「Boot 3.5 + 独立 spring-kafka 3.x」栈,升级路径可以概括为三层:依赖与 starter 模块化(Boot 4 BOM)、序列化与测试基础设施(Jackson 3、KRaft 嵌入式 broker)、消费模型扩展(经典组上的 KIP-848 opt-in,以及可选的 Share Group 手工装配)。前两层适合全量应用;第三层应按 workload 选型,而不是默认全开。


用 OpenRewrite 做 Boot 3→4 与 Kafka 3→4 迁移
#

为什么
#

手工改 pom.xml、包名与测试注解,容易漏掉 Jackson starter 替换或 @EmbeddedKafka 的废弃属性。OpenRewrite 把「可重复的 AST 级变更」固化成 recipe,适合在 CI 或分支上批量执行。

机制与约束
#

官方配方(可在 rewrite-spring 仓库 核对)包括:

演示仓库里还有自写 recipe(如 MigrateSpringBootJsonToJacksonStarterRemoveDeprecatedEmbeddedKafkaParameters),不在官方 recipes.csv 中,迁移时需自行维护。

怎么做
#

cd spring-kafka-3-to-4 && ./mvnw rewrite:run

rewrite.yml 中至少挂上官方 Boot/Kafka 升级配方,再按需追加自定义 visitor:

recipeList:
  - org.openrewrite.java.spring.boot4.UpgradeSpringBoot_4_0
  - org.openrewrite.java.spring.kafka.UpgradeSpringKafka_4_0
  # 演示仓库自定义(非官方)
  - com.example.spring.kafka.RemoveDeprecatedEmbeddedKafkaParameters

演示终端:cd spring-kafka-3-to-4./mvnw rewrite:run,IDE 中可见 Upgrade Spring Boot / Spring Kafka 配方说明。

常见误区
#

  • 把演示里的 CustomUpgradeSpringkafka_4_0 当成官方配方名 — 官方 Kafka 迁移名为 UpgradeSpringKafka_4_0
  • 只跑 OpenRewrite、不跑测试 — ZK 相关注解与序列化器变更仍需测试覆盖。
  • 在 monorepo 多模块上只跑父 POM 配方 — 子模块若仍直接声明 spring-kafka 版本,可能需分模块执行或补充 ChangeDependency visitor。

模块化 Starter 与 Jackson 3 序列化
#

为什么
#

Boot 4 将 Kafka、JSON 能力拆成独立 starter,并与 Jackson 3tools.jackson)对齐。继续直接依赖 spring-kafka + spring-boot-starter-json(Jackson 2)会导致自动配置包与序列化行为漂移。

机制与约束
#

迁移前(示意)迁移后
spring-kafkaspring-boot-starter-kafka
spring-boot-starter-jsonspring-boot-starter-jackson
spring-kafka-testspring-boot-starter-kafka-test

JsonSerializer / JsonDeserializer 自 4.0 起弃用,应改用 JacksonJsonSerializer / JacksonJsonDeserializer(基于 Jackson 3 的 JsonMapper)。

怎么做
#

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jackson</artifactId>
</dependency>
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JacksonJsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JacksonJsonDeserializer

Side-by-side:spring-kafka 3.5.0 与 spring-boot-starter-kafka 4.0.5 的 POM 差异。

迁移前 application.yml 仍使用 JsonSerializerErrorHandlingDeserializer 委托配置。

常见误区
#

  • 只改依赖、不改 serializer 类名 — 运行时仍走已弃用的 Jackson 2 适配层。
  • 忽略 spring.json.trusted.packages 等安全相关属性 — Jackson 3 下仍需按 What’s New 检查 JSON 信任包配置。
  • 混用第三方库的 Jackson 2 绑定 — Boot 4 应用 classpath 以 Jackson 3 为主,第三方 starter 若强依赖 com.fasterxml.jackson 需单独评估兼容性。

ErrorHandlingDeserializer 仍可与 JacksonJsonDeserializer 委托搭配:外层捕获反序列化异常,内层 delegate 指向 Jackson 3 反序列化器,这样 poison pill 在进入 listener 前就能被分类处理。


测试里的 @EmbeddedKafka 与 KRaft
#

为什么
#

Kafka 4 broker 仅支持 KRaft,不再使用 ZooKeeper。集成测试若保留 zookeeperPortkraft = false 等属性,会在嵌入式 broker 启动阶段失败,且与生产拓扑不一致。

机制与约束
#

Spring Kafka 4.0 What’s New 明确移除 @EmbeddedKafkazookeeperPortzkConnectionTimeoutzkSessionTimeoutkraft 等属性;实现改为 EmbeddedKafkaKraftBroker

怎么做
#

@SpringBootTest
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 3)
class SpringKafkaApplicationTests { }

OpenRewrite 自定义 recipe 可批量删除废弃属性(演示仓库做法)。

迁移对比:zookeeperPort = 2181kraft = false 与精简后的 @EmbeddedKafka(partitions = 3)

常见误区
#

  • 以为删掉 kraft = false 等于「关闭 KRaft」— 实际是 强制 KRaft-only
  • 在生产 broker 仍混用 ZK 的旧集群上直接升 Kafka 4 — 与测试注解无关,属 broker 升级范畴。

经典 Consumer Group 上的 Poison Pill 与 DLT
#

为什么
#

不可反序列化或业务永久失败的消息若只在组内无限重试,会拖住整个 group.id 的分区进度(head-of-line blocking)。经典 consumer group 上,Spring Kafka 成熟的模式是把失败记录发到 Dead Letter Topic(DLT)

机制与约束
#

DeadLetterPublishingRecoverer 实现 ConsumerRecordRecoverer,将失败 ConsumerRecord 发布到 DLT,常与 ErrorHandlingDeserializerDLT Strategies 配合。

边界(演讲者观点,与官方文档方向一致)Share Group 目前没有 DLT 体系; poison pill 需依赖 reject()、投递次数上限等 KIP-932 语义,不能照搬 @RetryableTopic

怎么做
#

@Configuration
class KafkaExceptionHandlingConfiguration {

  @Bean
  DeadLetterPublishingRecoverer recoverer(
      KafkaTemplate<?, ?> bytesKafkaTemplate,
      Map<Class<?>, KafkaTemplate<?, ?>> templates) {
    return new DeadLetterPublishingRecoverer(bytesKafkaTemplate, templates);
  }
}

演示类 KafkaExceptionHandlingConfigurationDeadLetterPublishingRecoverer Bean 定义。

常见误区
#

  • 在 Share consumer 上配置 DLT recoverer — 官方 kafka-queues 未覆盖该组合。
  • 未区分反序列化失败与业务失败 — 前者应优先用 ErrorHandlingDeserializer,后者再走 recoverer / retry topic。

KIP-932:Share Group 解决什么问题
#

为什么
#

经典 consumer group 下,活跃消费者数量通常 不超过分区数;单条慢消息会阻塞该消费者负责的分区(partition 级 head-of-line blocking)。突发流量时,团队常被迫过度增加分区,只为换横向扩展能力。

KIP-932 引入 Share Group:同一 share group 内多个 ShareConsumer 可在记录粒度协作消费,带来接近消息队列的语义 — 按条确认、可 release 给他人重试。

机制与约束
#

消息在 broker 侧维护 per-record 状态(KIP 与 Spring 文档共识):

Mermaid diagram 1

  • Available → Acquired:消费者拉取时投递计数增加(配置名见 KIP:group.share.delivery.attempt.limit)。
  • Acquired → Acknowledged:成功处理。
  • release:暂时失败,可再次被组内其他实例获取。
  • reject:永久失败,进入 Archived(类似丢弃 / 归档,不是 Spring DLT)。

演讲者观点(未在本文逐条对照 Kafka 4.1 默认值):默认约 5 次投递、约 30s 处理锁;Kafka 4.2 GA 与 Boot 4.1 在五月对齐的时间表 — 请以你使用的 broker / BOM 版本为准。Boot 4.0 Release Notes 写的是 Kafka 4.1.0

与经典 offset 提交不同,Share 消费不依赖「每分区一个逻辑位点」来界定进度,而是依赖 broker 对单条记录的状态机。运维上应关注 delivery countArchived 比例,而不是仅看 consumer lag(经典指标对 share 的解释方式可能不同,需对照你使用的 Kafka 版本监控文档)。

幻灯片「Queues for Kafka – How does it work?」:Consumer group 与 Share group 并行模型对比。

消息状态:Available → Acquired(delivery count++)→ Acknowledged / Archived。

常见误区
#

  • 把 Share Group 当作「分区更多的 consumer group」— 它不保证分区内顺序
  • 在需要 Kafka Streams、批量 @KafkaListener、正则 topic、静态分区分配的场景硬上 Share — Spring 文档 明确不支持 多项能力。

Share Consumer 的手工装配
#

为什么
#

截至 Spring Kafka 4.0 文档,Share consumer 处于 early access没有spring.kafka.consumer.* 对等的 Boot 自动配置。缺 ShareKafkaListenerContainerFactory 时,监听器无法在运行期正确创建 share 容器。

机制与约束
#

核心类型:

4.0 vs 4.1:4.0.x 使用 setExplicitShareAcknowledgment(true) 开启 listener 侧 ShareAcknowledgment4.1 起文档化为 ShareAckMode.MANUAL。演示代码中的 setShareAckMode 面向 4.1;在 4.0.5 上应改用 explicit 标志。

部分经典 ConsumerConfig 对 share group 无效 — 最小集通常只需 bootstrap.servers(演讲演示如此;完整列表以 Kafka 文档为准)。

演讲者观点:Share 场景的 Micrometer tracing / 与经典组对等的 metrics 仍不成熟。

怎么做
#

@Configuration
@EnableKafka
class ShareConsumerConfiguration {

  @Bean
  ShareConsumerFactory<String, TransactionEvent> shareConsumerFactory() {
  Map<String, Object> props = Map.of(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new DefaultShareConsumerFactory<>(props);
  }

  @Bean
  ShareKafkaListenerContainerFactory<String, TransactionEvent>
      manualShareKafkaListenerContainerFactory(
          ShareConsumerFactory<String, TransactionEvent> factory) {
    var f = new ShareKafkaListenerContainerFactory<>(factory);
    f.setConcurrency(6);
    f.getContainerProperties().setShareAckMode(ShareAckMode.MANUAL); // 4.1+
    f.getContainerProperties()
        .setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
    return f;
  }
}

4.0.x 等价写法(无 ShareAckMode 时):

f.getContainerProperties().setExplicitShareAcknowledgment(true);

IDE:ShareConsumerConfigurationDefaultShareConsumerFactory import。

ManualShareConsumerConfigurationShareKafkaListenerContainerFactorysetConcurrency(6)ShareAckMode MANUAL

演示仓库模块:spring-kafka-4-producerspring-kafka-4-share-consumer

常见误区
#

  • 只加 @KafkaListener、不声明 containerFactory — 会落到经典容器工厂。
  • spring.kafka.listener.concurrency 当成 share 并发唯一旋钮 — share 容器需单独 setConcurrency,且与分区数经典组那种硬上限关系(协作消费模型不同)。

ShareAcknowledgment:acknowledge、release、reject
#

为什么
#

工作队列场景要区分「下游暂时不可用」与「业务上永远无效」。仅靠抛异常一种路径,无法精确映射 KIP-932 的 release(可重试)与 reject(归档)。

机制与约束
#

方法语义(Javadoc
acknowledge()成功
release()暂时失败,可再次被组内消费
reject()永久失败,不再投递

explicit / MANUAL 模式下,未 ack 的同线程记录会阻塞后续 poll(Poll Constraints)。implicit 模式下,方法正常返回可触发自动 ack — 勿与 classic 的 AckMode 混淆。

4.1+ShareAcknowledgment.renew() 可延长 acquisition lock;默认锁时长与 group.share.record.lock.duration.ms 相关(文档示例 30 seconds)。长事务处理应规划 renew,而非单纯调大超时。

怎么做
#

@KafkaListener(
    id = "annotation-manual-transaction-event-processor",
    topics = "transaction-events",
    containerFactory = "manualShareKafkaListenerContainerFactory",
    groupId = "annotation-manual-group")
void handle(ConsumerRecord<String, TransactionEvent> record,
            ShareAcknowledgment ack) {
  try {
    service.process(record.value());
    ack.acknowledge();
  } catch (TransientException e) {
    ack.release();
  } catch (Exception e) {
    ack.reject();
  }
}

演示还通过 HTTP POST /api/transactions 注入测试事件(handler 侧为 REST Controller + 上述 listener)。

AnnotationDrivenManualKafkaShareConsumer@KafkaListenercontainerFactory = "manualShareKafkaListenerContainerFactory"

控制台:Acknowledged / Rejected / Released for retry 三类日志。

包结构:explicitimplicitmanual 等示例并列。

常见误区
#

  • 在 MANUAL 模式下既不调用 ack、又期望自动提交 — 会阻塞同线程消费。
  • release() 当成经典 consumer 的 seek 回退 — 语义是重新进入 Available,由组内竞争,而非固定分区 offset。
  • 在 4.0.5 上使用 ShareAckMode.MANUAL 编译 — 应改用 setExplicitShareAcknowledgment(true)

Share Group 与经典组的选型
#

维度经典 Consumer GroupShare Group
并行度 vs 分区消费者数通常 ≤ 分区数可大于分区数(记录级协作)
顺序分区内有序不保证
DLT / @RetryableTopic成熟尚无(演讲归纳 + 文档未覆盖)
Boot 自动配置spring.kafka.*需手工 factory
批量 / 正则 topic / Streams支持文档 列明不支持
可观测性较完整演讲者观点:metrics/tracing 仍缺口

生产环境若依赖有序事件流、Streams、批量监听或成熟 DLT,应继续使用经典组。Share Group 更适合可接受乱序、需要多实例抢活的工作队列。演讲者观点:当前 Spring Boot 发行版中 share 仍为 preview,生产宜等待与 Kafka Queues GA 对齐的 Boot/Kafka 组合 — 具体月份以官方发布为准。

落地前可用下列问题自检:(1)能否接受同 key 的消息乱序到达?(2)失败重试是否接受「其他实例接手」而非固定分区重放?(3)永久失败是否接受 Archived 而非 DLT 审计?(4)监控与告警是否已覆盖 share 专属指标?任一项为否,应留在经典组并优先评估 KIP-848 降低 rebalance 成本。


KIP-848:group.protocol=consumer
#

为什么
#

分区多、消费者多的经典组在 rebalance 时可能出现整组停顿,与 K8s HPA、滚动发布频繁扩缩容冲突。KIP-848 把大量分配逻辑下沉 broker,消费者 opt-in 新协议。

机制与约束
#

Kafka 客户端源码(ConsumerConfig):

  • 配置项:group.protocol
  • 默认值:classic
  • 合法值:classic | consumer

Spring 绑定:

spring:
  kafka:
    consumer:
      properties:
        group.protocol: consumer

启用 consumer 后,partition.assignment.strategy 等经典配置会触发 ConfigException(与「自定义 PartitionAssignor 失效」一致)。演讲者观点:rebalance 体验改善的量化表述应以 KIP 正文为准,不宜脱离版本夸大「零停顿」。

本协议仅适用于经典 consumer group,与 Share Group(KIP-932)无关。

怎么做
#

group.protocol=consumer

确保 broker 已启用 KIP-848 所需升级策略(KIP 中的 bidirectional / upgrade 等集群配置 — 细节见 KIP Case Studies)。

常见误区
#

  • 在 Share consumer 上设置 group.protocol — 路径错误。
  • 同时保留自定义 assignor 与 consumer 协议 — 启动即失败。
  • 升级后仍用旧 Micrometer 仪表盘 — 演讲者观点:broker 侧 metrics 有 protocol=consumer|classic 等变更,需对照 KIP 更新告警。
  • 在蓝绿部署中同时启动两套不同 group.id 却共用同一业务逻辑 — 与协议升级无关,但会掩盖 rebalance 问题;协议迁移应在同一 group.id 下滚动。

对 Spring 应用,只需确保 ConsumerFactory / @KafkaListener 最终 Consumer 属性含 group.protocol=consumer;无需改 Spring API。回滚时将属性改回 classic 并滚动实例即可,前提是 KIP-848 允许的降级路径在你的 broker 版本上仍开启。


滚动从 classic 切到 consumer 协议
#

为什么
#

大规模 consumer group 往往不敢改协议,担心一次性 rebalance 造成消费空洞。KIP-848 写明可通过 rolling consumers 升级或降级。

机制与约束
#

KIP-848 摘录(已核实方向):

“Upgrading to the new protocol or downgrading from it is possible by rolling the consumers

演讲者观点(未对照 Kafka 4.1 broker 源码逐行验证):可先混跑 classic 与 consumer 实例;broker 在检测到新协议成员后加入组时,将组迁移到新协议;再逐步缩掉 classic 实例;若旧协议成员回归,可回退。演讲者建议:尽快结束混跑,避免长期双协议增加 broker 开销。

Mermaid diagram 2

怎么做
#

演示使用 Docker Compose 扩缩(命令来自现场 OCR,演示仓库 URL 未公开验证):

docker compose -f docker-compose-applications.yml -p demo up -d
docker compose -f docker-compose-applications.yml -p demo \
  scale consumer-new-consumer-protocol=1
# 验证组协议切换后,逐步 scale 到 3 并下线 classic 实例
docker compose -f docker-compose-applications.yml -p demo \
  scale consumer-new-consumer-protocol=3

终端:Scale from 0 to 1 instanceconsumer-new-consumer-protocol

Scale from 0 to 1Group protocol 日志中的 classic / consumer 切换。

常见误区
#

  • 未先升级 broker / 未启用集群升级策略就改客户端 — 行为未定义或启动失败。
  • 长期混跑两种协议当作稳态 — 与 KIP 及演讲建议相反。

参考与延伸阅读
#

相关文章