问题描述和背景#
昨晚,我们的网关在一段时间内经历了雪崩。症状如下:
1. 各种微服务不断报告异常:在写入 HTTP 响应时连接关闭:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
2. 同时,存在请求尚未完成读取但连接已经关闭的异常:
org.springframework.http.converter.HttpMessageNotReadableException: I/O error while reading input message; nested exception is java.io.IOException: UT000128: Remote peer closed connection before all data could be read
3. 前端不断触发请求超时警报:504 Gateway Time-out
4. 网关进程不断失败健康检查并被重启
5. 重启后,网关进程立即经历激增的请求量 - 每个实例峰值 2000 qps,安静期间 500 qps,由于自动扩展,繁忙时通常每个实例保持在 1000 qps 以下。然而,健康检查端点响应时间极长,导致实例不断重启
问题 1 和 2 可能是由网关的持续重启和由于某种原因失败的优雅关闭导致的,导致强制关闭突然终止连接,从而产生相关异常。
我们的网关基于 Spring Cloud Gateway 构建,具有基于 CPU 负载的自动扩展。奇怪的是,当请求量激增时,CPU 使用率并没有显著增加,保持在 60% 左右。由于 CPU 负载没有达到扩展阈值,自动扩展从未触发。为了快速解决问题,我们手动扩展了几个网关实例,将每个实例的负载控制在 1000 以下,这暂时解决了问题。
问题分析#
为了彻底解决这个问题,我们使用了 JFR 分析。首先,我们基于已知线索进行分析:
- Spring Cloud Gateway 是基于 Spring-WebFlux 的异步响应式网关,HTTP 业务线程有限(默认是 2 * 可用 CPU 核心数,在我们的情况下是 4)。
- 网关进程不断失败健康检查,这调用了不断超时的 /actuator/health 端点。
健康检查端点超时通常有两个原因:
- 健康检查接口在检查某些组件时被阻塞。例如,如果数据库卡住,数据库健康检查可能永远不会返回。
- HTTP 线程池在健康检查请求超时之前无法处理它们。
我们首先检查了 JFR 中的超时堆栈跟踪,看看 HTTP 线程是否卡在健康检查上。查看问题发生后的线程堆栈,专注于那 4 个 HTTP 线程,我们发现它们都有基本相同的堆栈,都在执行 Redis 命令:
"reactor-http-nio-1" #68 daemon prio=5 os_prio=0 cpu=70832.99ms elapsed=199.98s tid=0x0000ffffb2f8a740 nid=0x69 waiting on condition [0x0000fffe8adfc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
- parking to wait for <0x00000007d50eddf8> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.8/CompletableFuture.java:1798)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.8/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.timedGet(java.base@11.0.8/CompletableFuture.java:1868)
at java.util.concurrent.CompletableFuture.get(java.base@11.0.8/CompletableFuture.java:2021)
at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:244)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy245.get(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68)
at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:267)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.get(DefaultStringRedisConnection.java:406)
at org.springframework.data.redis.core.DefaultValueOperations$1.inRedis(DefaultValueOperations.java:57)
at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:60)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53)
at com.jojotech.apigateway.filter.AccessCheckFilter.traced(AccessCheckFilter.java:196)
at com.jojotech.apigateway.filter.AbstractTracedFilter.filter(AbstractTracedFilter.java:39)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter.filter(FilteringWebHandler.java:137)
at org.springframework.cloud.gateway.filter.OrderedGatewayFilter.filter(OrderedGatewayFilter.java:44)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain.lambda$filter$0(FilteringWebHandler.java:117)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain$$Lambda$1478/0x0000000800b84c40.get(Unknown Source)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onNext(MonoFilterWhen.java:149)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onSubscribe(MonoFilterWhen.java:112)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:98)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.request(FluxDematerialize.java:127)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onSubscribe(FluxDematerialize.java:77)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace.subscribe(TraceWebFilter.java:184)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:915)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654)
at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:526)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:209)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at reactor.netty.http.server.logging.AccessLogHandlerH1.channelRead(AccessLogHandlerH1.java:59)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)
我们发现 HTTP 线程没有卡在健康检查上,也没有其他线程有任何与健康检查相关的堆栈(在异步环境中,健康检查也是异步的,一些进程可能被移交给其他线程)。因此,健康检查请求应该在执行之前超时。
为什么会发生这种情况?同时,我注意到这里使用了 RedisTemplate - spring-data-redis 的同步 Redis API。我突然想起,之前编写此代码时,我走了一个捷径,没有使用异步 API,因为我只是验证键是否存在并修改键过期时间。这会不会通过用同步 API 阻塞 HTTP 线程而导致雪崩?
让我们验证这个假设:我们的项目通过 spring-data-redis + Lettuce 连接池使用 Redis 操作,启用了增强的 JFR 监控用于 Lettuce 命令。你可以参考我的文章:“这个新的 Redis 连接池监控方法很棒 - 让我添加一些额外的调味料”。到目前为止,我的 pull request 已被合并,此功能将在 6.2.x 版本中发布。让我们看看问题发生时的 Redis 命令收集:

让我们计算执行 Redis 命令导致的阻塞时间(我们的收集是每 10 秒,count 是命令数,时间单位是微秒):使用这里的命令数乘以 50% 中位数,除以 10(因为它是 10 秒),我们得到执行 Redis 命令每秒导致的阻塞时间:
32*152=4864
1*860=860
5*163=815
32*176=5632
1*178=178
16959*168=2849112
774*176=136224
3144*166=521904
17343*179=3104397
702*166=116532
Total: 6740518
6740518 / 10 = 674051.8 us = 0.67s
这只是使用中位数计算的阻塞时间。从图中的分布,我们可以看到实际值应该更大。这样,每秒在 Redis 同步接口上阻塞所需的时间很容易超过 1 秒。随着不减少的连续请求,请求不断累积,最终导致雪崩。
此外,由于这些是阻塞接口,线程花费大量时间等待 I/O,所以 CPU 使用率不会增加,阻止自动扩展。在业务高峰期间,由于预先配置的扩展,网关实例没有达到有问题的压力水平,所以没有问题。
问题解决#
让我们重写原始代码。使用同步 spring-data-redis API 的原始代码是(本质上是 spring-cloud-gateway Filter 接口中核心方法 public Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain) 的方法体):
if (StringUtils.isBlank(token)) {
//如果 token 不存在,根据路径决定是继续请求还是返回未授权状态
return continueOrUnauthorized(path, exchange, chain, headers);
} else {
try {
String accessTokenValue = redisTemplate.opsForValue().get(token);
if (StringUtils.isNotBlank(accessTokenValue)) {
//如果 accessTokenValue 不为空,延长 4 小时以确保登录用户只要有活动,令牌就不会过期
Long expire = redisTemplate.getExpire(token);
log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
if (expire != null && expire < 4 * 60 * 60) {
redisTemplate.expire(token, 4, TimeUnit.HOURS);
}
//解析以获取 userId
JSONObject accessToken = JSON.parseObject(accessTokenValue);
String userId = accessToken.getString("userId");
//仅当 userId 不为空时有效
if (StringUtils.isNotBlank(userId)) {
//解析 Token
HttpHeaders newHeaders = parse(accessToken);
//继续请求
return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
}
}
} catch (Exception e) {
log.error("read accessToken error: {}", e.getMessage(), e);
}
//如果 token 无效,根据路径决定是继续请求还是返回未授权状态
return continueOrUnauthorized(path, exchange, chain, headers);
}
转换为使用异步:
if (StringUtils.isBlank(token)) {
return continueOrUnauthorized(path, exchange, chain, headers);
} else {
HttpHeaders finalHeaders = headers;
//必须用 tracedPublisherFactory 包装,否则跟踪信息会丢失。参考我的另一篇文章:Spring Cloud Gateway 没有跟踪信息,我完全困惑
return tracedPublisherFactory.getTracedMono(
redisTemplate.opsForValue().get(token)
//必须切换线程,否则后续线程仍将使用 Redisson 的线程。如果耗时,它会影响其他使用 Redis 的业务,此时间消耗也会计入 Redis 连接命令超时
.publishOn(Schedulers.parallel()),
exchange
).doOnSuccess(accessTokenValue -> {
if (accessTokenValue != null) {
//AccessToken 续期,4 小时
tracedPublisherFactory.getTracedMono(redisTemplate.getExpire(token).publishOn(Schedulers.parallel()), exchange).doOnSuccess(expire -> {
log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
if (expire != null && expire.toHours() < 4) {
redisTemplate.expire(token, Duration.ofHours(4)).subscribe();
}
}).subscribe();
}
})
//必须转换为非空,否则 flatmap 不会执行;也不能在最后使用 switchIfEmpty,因为整体返回是 Mono<Void>,无论如何都携带空内容,导致每个请求被发送两次。
.defaultIfEmpty("")
.flatMap(accessTokenValue -> {
try {
if (StringUtils.isNotBlank(accessTokenValue)) {
JSONObject accessToken = JSON.parseObject(accessTokenValue);
String userId = accessToken.getString("userId");
if (StringUtils.isNotBlank(userId)) {
//解析 Token
HttpHeaders newHeaders = parse(accessToken);
//继续请求
return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
}
}
return continueOrUnauthorized(path, exchange, chain, finalHeaders);
} catch (Exception e) {
log.error("read accessToken error: {}", e.getMessage(), e);
return continueOrUnauthorized(path, exchange, chain, finalHeaders);
}
});
}
以下是几个关键点需要注意:
- Spring-Cloud-Sleuth 优先在 Spring-WebFlux 中进行跟踪。如果我们在 Filters 中创建新的 Flux 或 Mono,内部没有跟踪信息,需要手动添加。这可以参考我的另一篇文章:Spring Cloud Gateway 没有跟踪信息,我完全困惑
- 对于 spring-data-redis + Lettuce 连接池组合,对于异步接口,我们应该在获得响应后切换到不同的线程池。否则,后续线程仍将使用 Redisson 的线程,如果耗时,它会影响其他使用 Redis 的业务,此时间消耗也会计入 Redis 连接命令超时
- Project Reactor 如果中间结果有 null 值,不会执行后续的 flatmap、map 和其他流操作。如果在这里终止,前端会收到有问题的响应。所以我们需要在中间结果的每一步考虑 null 问题。
- spring-cloud-gateway 中的核心 GatewayFilter 接口从其核心方法返回
Mono<Void>。Mono本质上携带空内容,阻止我们在最后使用 switchIfEmpty 来简化中间步骤中的 null 处理。使用它会导致每个请求被发送两次。
经过此修改后,对网关进行压力测试显示,即使每个单实例 20k qps 也没有重现此问题。

