This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 61d6cb7a2f bugfix: prevent Netty I/O thread blocking by async channel
release via reconnectExecutor (#7505)
61d6cb7a2f is described below
commit 61d6cb7a2f2b5a2bdbdcf2a0c11f1de1aa5b272b
Author: Noah <[email protected]>
AuthorDate: Thu Jul 24 10:59:48 2025 +0900
bugfix: prevent Netty I/O thread blocking by async channel release via
reconnectExecutor (#7505)
---
changes/en-us/2.x.md | 2 +
changes/zh-cn/2.x.md | 2 +
.../rpc/netty/AbstractNettyRemotingClient.java | 30 ++++++++--
.../seata/core/rpc/netty/ResourceCleanupTest.java | 66 ++++++++++++++++++++++
4 files changed, 96 insertions(+), 4 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 825833aa5f..fb54cb76f8 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -27,6 +27,8 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7538](https://github.com/apache/incubator-seata/pull/7538)] unify
DmdbTimestamp comparison via UTC Instant to prevent rollback failure
- [[#7546](https://github.com/seata/seata/pull/7546)] fix client spring
version compatible
+- [[#7505](https://github.com/apache/incubator-seata/pull/7505)] prevent Netty
I/O thread blocking by async channel release via reconnectExecutor
+
### optimize:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index a167c7e1d0..bc85327c39 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -27,6 +27,8 @@
- [[#7538](https://github.com/apache/incubator-seata/pull/7538)]
统一DmdbTimestamp比较方式,通过UTC比较,以防止回滚失败
- [[#7546](https://github.com/seata/seata/pull/7546)] 修复客户端spring版本兼容
+- [[#7505](https://github.com/apache/incubator-seata/pull/7505)] 通过使用
reconnectExecutor 异步释放 channel,防止阻塞 Netty I/O 线程
+
### optimize:
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 28ff56baef..1e313c8c9f 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -681,8 +681,13 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel inactive: {}", ctx.channel());
}
- clientChannelManager.releaseChannel(
- ctx.channel(),
NetUtil.toStringAddress(ctx.channel().remoteAddress()));
+ timerExecutor.execute(() -> {
+ try {
+ clientChannelManager.releaseChannel(ctx.channel(),
getAddressFromChannel(ctx.channel()));
+ } catch (Throwable throwable) {
+ LOGGER.error("release channel error: {}",
throwable.getMessage(), throwable);
+ }
+ });
super.channelInactive(ctx);
}
@@ -701,7 +706,18 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
} finally {
- clientChannelManager.releaseChannel(ctx.channel(),
getAddressFromContext(ctx));
+ try {
+ timerExecutor.execute(() -> {
+ try {
+ clientChannelManager.releaseChannel(
+ ctx.channel(),
getAddressFromChannel(ctx.channel()));
+ } catch (Throwable throwable) {
+ LOGGER.error("release channel error: {}",
throwable.getMessage(), throwable);
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.error("failed to schedule releaseChannel:
{}", e.getMessage(), e);
+ }
}
}
if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
@@ -723,7 +739,13 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
FrameworkErrorCode.ExceptionCaught.getErrCode(),
NetUtil.toStringAddress(ctx.channel().remoteAddress()) +
"connect exception. " + cause.getMessage(),
cause);
- clientChannelManager.releaseChannel(ctx.channel(),
getAddressFromChannel(ctx.channel()));
+ timerExecutor.execute(() -> {
+ try {
+ clientChannelManager.releaseChannel(ctx.channel(),
getAddressFromChannel(ctx.channel()));
+ } catch (Throwable throwable) {
+ LOGGER.error("release channel error: {}",
throwable.getMessage(), throwable);
+ }
+ });
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove exception rm channel:{}", ctx.channel());
}
diff --git
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
index 8c10bc9b83..96e657fc73 100644
---
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
+++
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
@@ -17,7 +17,9 @@
package org.apache.seata.core.rpc.netty;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
+import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.core.protocol.MergeMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
import org.apache.seata.core.protocol.MessageFuture;
@@ -59,6 +61,7 @@ class ResourceCleanupTest {
@BeforeEach
void setUp() throws Exception {
client = TmNettyRemotingClient.getInstance();
+ client.init();
Field futuresField =
AbstractNettyRemoting.class.getDeclaredField("futures");
futuresField.setAccessible(true);
@@ -143,6 +146,69 @@ class ResourceCleanupTest {
assertTrue(futures.containsKey(1), "Future ID 1 should still exist");
}
+ @Test
+ void testExceptionCaughtTriggersChannelRelease() throws Exception {
+ AbstractNettyRemotingClient.ClientHandler handler = client.new
ClientHandler();
+ ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+ when(mockCtx.channel()).thenReturn(channel);
+ when(channel.remoteAddress()).thenReturn(new
InetSocketAddress("127.0.0.1", 8091));
+ Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ channelManagerField.setAccessible(true);
+ NettyClientChannelManager originalManager =
(NettyClientChannelManager) channelManagerField.get(client);
+
+ NettyClientChannelManager spyManager = spy(originalManager);
+ channelManagerField.set(client, spyManager);
+
+ handler.exceptionCaught(mockCtx, new IllegalArgumentException("test"));
+
+ Thread.sleep(500);
+ verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+ channelManagerField.set(client, originalManager);
+ }
+
+ @Test
+ void testUserEventTriggeredReaderIdleReleasesChannel() throws Exception {
+ AbstractNettyRemotingClient.ClientHandler handler = client.new
ClientHandler();
+ ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+ when(mockCtx.channel()).thenReturn(channel);
+ when(channel.remoteAddress()).thenReturn(new
InetSocketAddress("127.0.0.1", 8091));
+
+ Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ channelManagerField.setAccessible(true);
+ NettyClientChannelManager originalManager =
(NettyClientChannelManager) channelManagerField.get(client);
+
+ NettyClientChannelManager spyManager = spy(originalManager);
+ channelManagerField.set(client, spyManager);
+
+ IdleStateEvent readerIdleEvent =
IdleStateEvent.READER_IDLE_STATE_EVENT;
+ handler.userEventTriggered(mockCtx, readerIdleEvent);
+
+ Thread.sleep(500);
+ verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+ channelManagerField.set(client, originalManager);
+ }
+
+ @Test
+ void testChannelInactiveTriggersChannelRelease() throws Exception {
+ AbstractNettyRemotingClient.ClientHandler handler = client.new
ClientHandler();
+ ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+ when(mockCtx.channel()).thenReturn(channel);
+ when(channel.remoteAddress()).thenReturn(new
InetSocketAddress("127.0.0.1", 8091));
+ Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ channelManagerField.setAccessible(true);
+ NettyClientChannelManager originalManager =
(NettyClientChannelManager) channelManagerField.get(client);
+
+ NettyClientChannelManager spyManager = spy(originalManager);
+ channelManagerField.set(client, spyManager);
+
+ handler.channelInactive(mockCtx);
+
+ Thread.sleep(500);
+
+ verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+ channelManagerField.set(client, originalManager);
+ }
+
private RpcMessage createRpcMessage(int id) {
RpcMessage message = new RpcMessage();
message.setId(id);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]