Spring for Apache Kafka 4:迁移、Share Group 与新 Consumer 协议#
Spring for Apache Kafka(下文简称 Spring Kafka)4.0 随 Spring Boot 4.0 发布,捆绑 Kafka client 4.1、Jackson 3,并在客户端侧对接 KIP-932(Queues / Share Group)与 KIP-848(group.protocol=consumer)。本文按工程决策组织:先谈可复现的 3→4 迁移,再谈经典 Consumer Group 上仍适用的错误处理,最后分述 Share Group 的手工装配边界,以及新一代 rebalance 的 opt-in 升级。
版本提示:Share consumer 在 Spring Kafka 4.0 文档 中标注为 preview / early access;
ShareAckMode枚举与renew()出现在 4.1 文档。下文在 4.0.x 与 4.1 有差异处会单独标明。
若你维护的是「Boot 3.5 + 独立 spring-kafka 3.x」栈,升级路径可以概括为三层:依赖与 starter 模块化(Boot 4 BOM)、序列化与测试基础设施(Jackson 3、KRaft 嵌入式 broker)、消费模型扩展(经典组上的 KIP-848 opt-in,以及可选的 Share Group 手工装配)。前两层适合全量应用;第三层应按 workload 选型,而不是默认全开。
用 OpenRewrite 做 Boot 3→4 与 Kafka 3→4 迁移#
为什么#
手工改 pom.xml、包名与测试注解,容易漏掉 Jackson starter 替换或 @EmbeddedKafka 的废弃属性。OpenRewrite 把「可重复的 AST 级变更」固化成 recipe,适合在 CI 或分支上批量执行。
机制与约束#
官方配方(可在 rewrite-spring 仓库 核对)包括:
UpgradeSpringBoot_4_0— 聚合 Boot 4.0 依赖与MigrateToModularStartersMigrateAutoconfigurePackages— autoconfigure 包路径迁移UpgradeSpringKafka_4_0— 如JsonSerializer→JacksonJsonSerializer
演示仓库里还有自写 recipe(如 MigrateSpringBootJsonToJacksonStarter、RemoveDeprecatedEmbeddedKafkaParameters),不在官方 recipes.csv 中,迁移时需自行维护。
怎么做#
cd spring-kafka-3-to-4 && ./mvnw rewrite:run
rewrite.yml 中至少挂上官方 Boot/Kafka 升级配方,再按需追加自定义 visitor:
recipeList:
- org.openrewrite.java.spring.boot4.UpgradeSpringBoot_4_0
- org.openrewrite.java.spring.kafka.UpgradeSpringKafka_4_0
# 演示仓库自定义(非官方)
- com.example.spring.kafka.RemoveDeprecatedEmbeddedKafkaParameters

cd spring-kafka-3-to-4 与 ./mvnw rewrite:run,IDE 中可见 Upgrade Spring Boot / Spring Kafka 配方说明。
常见误区#
- 把演示里的
CustomUpgradeSpringkafka_4_0当成官方配方名 — 官方 Kafka 迁移名为UpgradeSpringKafka_4_0。 - 只跑 OpenRewrite、不跑测试 — ZK 相关注解与序列化器变更仍需测试覆盖。
- 在 monorepo 多模块上只跑父 POM 配方 — 子模块若仍直接声明
spring-kafka版本,可能需分模块执行或补充ChangeDependencyvisitor。
模块化 Starter 与 Jackson 3 序列化#
为什么#
Boot 4 将 Kafka、JSON 能力拆成独立 starter,并与 Jackson 3(tools.jackson)对齐。继续直接依赖 spring-kafka + spring-boot-starter-json(Jackson 2)会导致自动配置包与序列化行为漂移。
机制与约束#
| 迁移前(示意) | 迁移后 |
|---|---|
spring-kafka | spring-boot-starter-kafka |
spring-boot-starter-json | spring-boot-starter-jackson |
spring-kafka-test | spring-boot-starter-kafka-test |
JsonSerializer / JsonDeserializer 自 4.0 起弃用,应改用 JacksonJsonSerializer / JacksonJsonDeserializer(基于 Jackson 3 的 JsonMapper)。
怎么做#
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jackson</artifactId>
</dependency>
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JacksonJsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JacksonJsonDeserializer

spring-kafka 3.5.0 与 spring-boot-starter-kafka 4.0.5 的 POM 差异。

application.yml 仍使用 JsonSerializer 与 ErrorHandlingDeserializer 委托配置。
常见误区#
- 只改依赖、不改 serializer 类名 — 运行时仍走已弃用的 Jackson 2 适配层。
- 忽略
spring.json.trusted.packages等安全相关属性 — Jackson 3 下仍需按 What’s New 检查 JSON 信任包配置。 - 混用第三方库的 Jackson 2 绑定 — Boot 4 应用 classpath 以 Jackson 3 为主,第三方 starter 若强依赖
com.fasterxml.jackson需单独评估兼容性。
ErrorHandlingDeserializer 仍可与 JacksonJsonDeserializer 委托搭配:外层捕获反序列化异常,内层 delegate 指向 Jackson 3 反序列化器,这样 poison pill 在进入 listener 前就能被分类处理。
测试里的 @EmbeddedKafka 与 KRaft#
为什么#
Kafka 4 broker 仅支持 KRaft,不再使用 ZooKeeper。集成测试若保留 zookeeperPort、kraft = false 等属性,会在嵌入式 broker 启动阶段失败,且与生产拓扑不一致。
机制与约束#
Spring Kafka 4.0 What’s New 明确移除 @EmbeddedKafka 的 zookeeperPort、zkConnectionTimeout、zkSessionTimeout、kraft 等属性;实现改为 EmbeddedKafkaKraftBroker。
怎么做#
@SpringBootTest
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 3)
class SpringKafkaApplicationTests { }
OpenRewrite 自定义 recipe 可批量删除废弃属性(演示仓库做法)。

zookeeperPort = 2181、kraft = false 与精简后的 @EmbeddedKafka(partitions = 3)。
常见误区#
- 以为删掉
kraft = false等于「关闭 KRaft」— 实际是 强制 KRaft-only。 - 在生产 broker 仍混用 ZK 的旧集群上直接升 Kafka 4 — 与测试注解无关,属 broker 升级范畴。
经典 Consumer Group 上的 Poison Pill 与 DLT#
为什么#
不可反序列化或业务永久失败的消息若只在组内无限重试,会拖住整个 group.id 的分区进度(head-of-line blocking)。经典 consumer group 上,Spring Kafka 成熟的模式是把失败记录发到 Dead Letter Topic(DLT)。
机制与约束#
DeadLetterPublishingRecoverer 实现 ConsumerRecordRecoverer,将失败 ConsumerRecord 发布到 DLT,常与 ErrorHandlingDeserializer 及 DLT Strategies 配合。
边界(演讲者观点,与官方文档方向一致):Share Group 目前没有 DLT 体系; poison pill 需依赖 reject()、投递次数上限等 KIP-932 语义,不能照搬 @RetryableTopic。
怎么做#
@Configuration
class KafkaExceptionHandlingConfiguration {
@Bean
DeadLetterPublishingRecoverer recoverer(
KafkaTemplate<?, ?> bytesKafkaTemplate,
Map<Class<?>, KafkaTemplate<?, ?>> templates) {
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate, templates);
}
}

KafkaExceptionHandlingConfiguration 与 DeadLetterPublishingRecoverer Bean 定义。
常见误区#
- 在 Share consumer 上配置 DLT recoverer — 官方 kafka-queues 未覆盖该组合。
- 未区分反序列化失败与业务失败 — 前者应优先用
ErrorHandlingDeserializer,后者再走 recoverer / retry topic。
KIP-932:Share Group 解决什么问题#
为什么#
经典 consumer group 下,活跃消费者数量通常 不超过分区数;单条慢消息会阻塞该消费者负责的分区(partition 级 head-of-line blocking)。突发流量时,团队常被迫过度增加分区,只为换横向扩展能力。
KIP-932 引入 Share Group:同一 share group 内多个 ShareConsumer 可在记录粒度协作消费,带来接近消息队列的语义 — 按条确认、可 release 给他人重试。
机制与约束#
消息在 broker 侧维护 per-record 状态(KIP 与 Spring 文档共识):

- Available → Acquired:消费者拉取时投递计数增加(配置名见 KIP:
group.share.delivery.attempt.limit)。 - Acquired → Acknowledged:成功处理。
- release:暂时失败,可再次被组内其他实例获取。
- reject:永久失败,进入 Archived(类似丢弃 / 归档,不是 Spring DLT)。
演讲者观点(未在本文逐条对照 Kafka 4.1 默认值):默认约 5 次投递、约 30s 处理锁;Kafka 4.2 GA 与 Boot 4.1 在五月对齐的时间表 — 请以你使用的 broker / BOM 版本为准。Boot 4.0 Release Notes 写的是 Kafka 4.1.0。
与经典 offset 提交不同,Share 消费不依赖「每分区一个逻辑位点」来界定进度,而是依赖 broker 对单条记录的状态机。运维上应关注 delivery count 与 Archived 比例,而不是仅看 consumer lag(经典指标对 share 的解释方式可能不同,需对照你使用的 Kafka 版本监控文档)。


常见误区#
- 把 Share Group 当作「分区更多的 consumer group」— 它不保证分区内顺序。
- 在需要 Kafka Streams、批量
@KafkaListener、正则 topic、静态分区分配的场景硬上 Share — Spring 文档 明确不支持 多项能力。
Share Consumer 的手工装配#
为什么#
截至 Spring Kafka 4.0 文档,Share consumer 处于 early access,没有与 spring.kafka.consumer.* 对等的 Boot 自动配置。缺 ShareKafkaListenerContainerFactory 时,监听器无法在运行期正确创建 share 容器。
机制与约束#
核心类型:
DefaultShareConsumerFactoryShareKafkaListenerContainerFactoryContainerProperties.setShareAcknowledgmentTimeout(Duration)— @since 4.0
4.0 vs 4.1:4.0.x 使用 setExplicitShareAcknowledgment(true) 开启 listener 侧 ShareAcknowledgment;4.1 起文档化为 ShareAckMode.MANUAL。演示代码中的 setShareAckMode 面向 4.1;在 4.0.5 上应改用 explicit 标志。
部分经典 ConsumerConfig 对 share group 无效 — 最小集通常只需 bootstrap.servers(演讲演示如此;完整列表以 Kafka 文档为准)。
演讲者观点:Share 场景的 Micrometer tracing / 与经典组对等的 metrics 仍不成熟。
怎么做#
@Configuration
@EnableKafka
class ShareConsumerConfiguration {
@Bean
ShareConsumerFactory<String, TransactionEvent> shareConsumerFactory() {
Map<String, Object> props = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new DefaultShareConsumerFactory<>(props);
}
@Bean
ShareKafkaListenerContainerFactory<String, TransactionEvent>
manualShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, TransactionEvent> factory) {
var f = new ShareKafkaListenerContainerFactory<>(factory);
f.setConcurrency(6);
f.getContainerProperties().setShareAckMode(ShareAckMode.MANUAL); // 4.1+
f.getContainerProperties()
.setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return f;
}
}
4.0.x 等价写法(无 ShareAckMode 时):
f.getContainerProperties().setExplicitShareAcknowledgment(true);

ShareConsumerConfiguration 与 DefaultShareConsumerFactory import。

ManualShareConsumerConfiguration:ShareKafkaListenerContainerFactory、setConcurrency(6)、ShareAckMode MANUAL。

spring-kafka-4-producer 与 spring-kafka-4-share-consumer。
常见误区#
- 只加
@KafkaListener、不声明containerFactory— 会落到经典容器工厂。 - 把
spring.kafka.listener.concurrency当成 share 并发唯一旋钮 — share 容器需单独setConcurrency,且与分区数无经典组那种硬上限关系(协作消费模型不同)。
ShareAcknowledgment:acknowledge、release、reject#
为什么#
工作队列场景要区分「下游暂时不可用」与「业务上永远无效」。仅靠抛异常一种路径,无法精确映射 KIP-932 的 release(可重试)与 reject(归档)。
机制与约束#
| 方法 | 语义(Javadoc) |
|---|---|
acknowledge() | 成功 |
release() | 暂时失败,可再次被组内消费 |
reject() | 永久失败,不再投递 |
在 explicit / MANUAL 模式下,未 ack 的同线程记录会阻塞后续 poll(Poll Constraints)。implicit 模式下,方法正常返回可触发自动 ack — 勿与 classic 的 AckMode 混淆。
4.1+:ShareAcknowledgment.renew() 可延长 acquisition lock;默认锁时长与 group.share.record.lock.duration.ms 相关(文档示例 30 seconds)。长事务处理应规划 renew,而非单纯调大超时。
怎么做#
@KafkaListener(
id = "annotation-manual-transaction-event-processor",
topics = "transaction-events",
containerFactory = "manualShareKafkaListenerContainerFactory",
groupId = "annotation-manual-group")
void handle(ConsumerRecord<String, TransactionEvent> record,
ShareAcknowledgment ack) {
try {
service.process(record.value());
ack.acknowledge();
} catch (TransientException e) {
ack.release();
} catch (Exception e) {
ack.reject();
}
}
演示还通过 HTTP POST /api/transactions 注入测试事件(handler 侧为 REST Controller + 上述 listener)。

AnnotationDrivenManualKafkaShareConsumer:@KafkaListener 与 containerFactory = "manualShareKafkaListenerContainerFactory"。

Acknowledged / Rejected / Released for retry 三类日志。

explicit、implicit、manual 等示例并列。
常见误区#
- 在 MANUAL 模式下既不调用 ack、又期望自动提交 — 会阻塞同线程消费。
- 把
release()当成经典 consumer 的 seek 回退 — 语义是重新进入 Available,由组内竞争,而非固定分区 offset。 - 在 4.0.5 上使用
ShareAckMode.MANUAL编译 — 应改用setExplicitShareAcknowledgment(true)。
Share Group 与经典组的选型#
| 维度 | 经典 Consumer Group | Share Group |
|---|---|---|
| 并行度 vs 分区 | 消费者数通常 ≤ 分区数 | 可大于分区数(记录级协作) |
| 顺序 | 分区内有序 | 不保证 |
DLT / @RetryableTopic | 成熟 | 尚无(演讲归纳 + 文档未覆盖) |
| Boot 自动配置 | spring.kafka.* | 需手工 factory |
| 批量 / 正则 topic / Streams | 支持 | 文档 列明不支持 |
| 可观测性 | 较完整 | 演讲者观点:metrics/tracing 仍缺口 |
生产环境若依赖有序事件流、Streams、批量监听或成熟 DLT,应继续使用经典组。Share Group 更适合可接受乱序、需要多实例抢活的工作队列。演讲者观点:当前 Spring Boot 发行版中 share 仍为 preview,生产宜等待与 Kafka Queues GA 对齐的 Boot/Kafka 组合 — 具体月份以官方发布为准。
落地前可用下列问题自检:(1)能否接受同 key 的消息乱序到达?(2)失败重试是否接受「其他实例接手」而非固定分区重放?(3)永久失败是否接受 Archived 而非 DLT 审计?(4)监控与告警是否已覆盖 share 专属指标?任一项为否,应留在经典组并优先评估 KIP-848 降低 rebalance 成本。
KIP-848:group.protocol=consumer#
为什么#
分区多、消费者多的经典组在 rebalance 时可能出现整组停顿,与 K8s HPA、滚动发布频繁扩缩容冲突。KIP-848 把大量分配逻辑下沉 broker,消费者 opt-in 新协议。
机制与约束#
Kafka 客户端源码(ConsumerConfig):
- 配置项:
group.protocol - 默认值:
classic - 合法值:
classic|consumer
Spring 绑定:
spring:
kafka:
consumer:
properties:
group.protocol: consumer
启用 consumer 后,partition.assignment.strategy 等经典配置会触发 ConfigException(与「自定义 PartitionAssignor 失效」一致)。演讲者观点:rebalance 体验改善的量化表述应以 KIP 正文为准,不宜脱离版本夸大「零停顿」。
本协议仅适用于经典 consumer group,与 Share Group(KIP-932)无关。
怎么做#
group.protocol=consumer
确保 broker 已启用 KIP-848 所需升级策略(KIP 中的 bidirectional / upgrade 等集群配置 — 细节见 KIP Case Studies)。
常见误区#
- 在 Share consumer 上设置
group.protocol— 路径错误。 - 同时保留自定义 assignor 与
consumer协议 — 启动即失败。 - 升级后仍用旧 Micrometer 仪表盘 — 演讲者观点:broker 侧 metrics 有
protocol=consumer|classic等变更,需对照 KIP 更新告警。 - 在蓝绿部署中同时启动两套不同
group.id却共用同一业务逻辑 — 与协议升级无关,但会掩盖 rebalance 问题;协议迁移应在同一group.id下滚动。
对 Spring 应用,只需确保 ConsumerFactory / @KafkaListener 最终 Consumer 属性含 group.protocol=consumer;无需改 Spring API。回滚时将属性改回 classic 并滚动实例即可,前提是 KIP-848 允许的降级路径在你的 broker 版本上仍开启。
滚动从 classic 切到 consumer 协议#
为什么#
大规模 consumer group 往往不敢改协议,担心一次性 rebalance 造成消费空洞。KIP-848 写明可通过 rolling consumers 升级或降级。
机制与约束#
KIP-848 摘录(已核实方向):
“Upgrading to the new protocol or downgrading from it is possible by rolling the consumers”
演讲者观点(未对照 Kafka 4.1 broker 源码逐行验证):可先混跑 classic 与 consumer 实例;broker 在检测到新协议成员后加入组时,将组迁移到新协议;再逐步缩掉 classic 实例;若旧协议成员回归,可回退。演讲者建议:尽快结束混跑,避免长期双协议增加 broker 开销。

怎么做#
演示使用 Docker Compose 扩缩(命令来自现场 OCR,演示仓库 URL 未公开验证):
docker compose -f docker-compose-applications.yml -p demo up -d
docker compose -f docker-compose-applications.yml -p demo \
scale consumer-new-consumer-protocol=1
# 验证组协议切换后,逐步 scale 到 3 并下线 classic 实例
docker compose -f docker-compose-applications.yml -p demo \
scale consumer-new-consumer-protocol=3

Scale from 0 to 1 instance 与 consumer-new-consumer-protocol。

Scale from 0 to 1 与 Group protocol 日志中的 classic / consumer 切换。
常见误区#
- 未先升级 broker / 未启用集群升级策略就改客户端 — 行为未定义或启动失败。
- 长期混跑两种协议当作稳态 — 与 KIP 及演讲建议相反。
参考与延伸阅读#
- Spring for Apache Kafka 参考文档(4.0)
- Spring Kafka 4.0 — What’s New
- Spring Kafka 4.0 — Kafka Queues (Share Consumer)
- Spring Kafka 4.1 — ShareAckMode 与 renew()
- Spring Boot 4.0 — Release Notes
- Spring Boot 4.0 — Migration Guide(模块化 starter)
- Javadoc —
JacksonJsonSerializer - Javadoc —
@EmbeddedKafka - Javadoc —
DeadLetterPublishingRecoverer - Spring Kafka — DLT Strategies
- KIP-932: Queues for Kafka
- KIP-848: The Next Generation of the Consumer Rebalance Protocol
- Kafka
ConsumerConfig源码(group.protocol默认值) - OpenRewrite —
UpgradeSpringKafka_4_0配方 - OpenRewrite —
UpgradeSpringBoot_4_0配方



