This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 61d6cb7a2f bugfix: prevent Netty I/O thread blocking by async channel 
release via reconnectExecutor (#7505)
61d6cb7a2f is described below

commit 61d6cb7a2f2b5a2bdbdcf2a0c11f1de1aa5b272b
Author: Noah <[email protected]>
AuthorDate: Thu Jul 24 10:59:48 2025 +0900

    bugfix: prevent Netty I/O thread blocking by async channel release via 
reconnectExecutor (#7505)
---
 changes/en-us/2.x.md                               |  2 +
 changes/zh-cn/2.x.md                               |  2 +
 .../rpc/netty/AbstractNettyRemotingClient.java     | 30 ++++++++--
 .../seata/core/rpc/netty/ResourceCleanupTest.java  | 66 ++++++++++++++++++++++
 4 files changed, 96 insertions(+), 4 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 825833aa5f..fb54cb76f8 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -27,6 +27,8 @@ Add changes here for all PR submitted to the 2.x branch.
 
 - [[#7538](https://github.com/apache/incubator-seata/pull/7538)] unify 
DmdbTimestamp comparison via UTC Instant to prevent rollback failure
 - [[#7546](https://github.com/seata/seata/pull/7546)] fix client spring 
version compatible
+- [[#7505](https://github.com/apache/incubator-seata/pull/7505)] prevent Netty 
I/O thread blocking by async channel release via reconnectExecutor
+
 
 ### optimize:
 
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index a167c7e1d0..bc85327c39 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -27,6 +27,8 @@
 
 - [[#7538](https://github.com/apache/incubator-seata/pull/7538)] 
统一DmdbTimestamp比较方式,通过UTC比较,以防止回滚失败
 - [[#7546](https://github.com/seata/seata/pull/7546)] 修复客户端spring版本兼容
+- [[#7505](https://github.com/apache/incubator-seata/pull/7505)] 通过使用 
reconnectExecutor 异步释放 channel,防止阻塞 Netty I/O 线程
+
 
 
 ### optimize:
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 28ff56baef..1e313c8c9f 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -681,8 +681,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("channel inactive: {}", ctx.channel());
             }
-            clientChannelManager.releaseChannel(
-                    ctx.channel(), 
NetUtil.toStringAddress(ctx.channel().remoteAddress()));
+            timerExecutor.execute(() -> {
+                try {
+                    clientChannelManager.releaseChannel(ctx.channel(), 
getAddressFromChannel(ctx.channel()));
+                } catch (Throwable throwable) {
+                    LOGGER.error("release channel error: {}", 
throwable.getMessage(), throwable);
+                }
+            });
             super.channelInactive(ctx);
         }
 
@@ -701,7 +706,18 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
                     } catch (Exception exx) {
                         LOGGER.error(exx.getMessage());
                     } finally {
-                        clientChannelManager.releaseChannel(ctx.channel(), 
getAddressFromContext(ctx));
+                        try {
+                            timerExecutor.execute(() -> {
+                                try {
+                                    clientChannelManager.releaseChannel(
+                                            ctx.channel(), 
getAddressFromChannel(ctx.channel()));
+                                } catch (Throwable throwable) {
+                                    LOGGER.error("release channel error: {}", 
throwable.getMessage(), throwable);
+                                }
+                            });
+                        } catch (Exception e) {
+                            LOGGER.error("failed to schedule releaseChannel: 
{}", e.getMessage(), e);
+                        }
                     }
                 }
                 if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
@@ -723,7 +739,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
                     FrameworkErrorCode.ExceptionCaught.getErrCode(),
                     NetUtil.toStringAddress(ctx.channel().remoteAddress()) + 
"connect exception. " + cause.getMessage(),
                     cause);
-            clientChannelManager.releaseChannel(ctx.channel(), 
getAddressFromChannel(ctx.channel()));
+            timerExecutor.execute(() -> {
+                try {
+                    clientChannelManager.releaseChannel(ctx.channel(), 
getAddressFromChannel(ctx.channel()));
+                } catch (Throwable throwable) {
+                    LOGGER.error("release channel error: {}", 
throwable.getMessage(), throwable);
+                }
+            });
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("remove exception rm channel:{}", ctx.channel());
             }
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
index 8c10bc9b83..96e657fc73 100644
--- 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
@@ -17,7 +17,9 @@
 package org.apache.seata.core.rpc.netty;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelId;
+import io.netty.handler.timeout.IdleStateEvent;
 import org.apache.seata.core.protocol.MergeMessage;
 import org.apache.seata.core.protocol.MergedWarpMessage;
 import org.apache.seata.core.protocol.MessageFuture;
@@ -59,6 +61,7 @@ class ResourceCleanupTest {
     @BeforeEach
     void setUp() throws Exception {
         client = TmNettyRemotingClient.getInstance();
+        client.init();
 
         Field futuresField = 
AbstractNettyRemoting.class.getDeclaredField("futures");
         futuresField.setAccessible(true);
@@ -143,6 +146,69 @@ class ResourceCleanupTest {
         assertTrue(futures.containsKey(1), "Future ID 1 should still exist");
     }
 
+    @Test
+    void testExceptionCaughtTriggersChannelRelease() throws Exception {
+        AbstractNettyRemotingClient.ClientHandler handler = client.new 
ClientHandler();
+        ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+        when(mockCtx.channel()).thenReturn(channel);
+        when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("127.0.0.1", 8091));
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        NettyClientChannelManager originalManager = 
(NettyClientChannelManager) channelManagerField.get(client);
+
+        NettyClientChannelManager spyManager = spy(originalManager);
+        channelManagerField.set(client, spyManager);
+
+        handler.exceptionCaught(mockCtx, new IllegalArgumentException("test"));
+
+        Thread.sleep(500);
+        verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+        channelManagerField.set(client, originalManager);
+    }
+
+    @Test
+    void testUserEventTriggeredReaderIdleReleasesChannel() throws Exception {
+        AbstractNettyRemotingClient.ClientHandler handler = client.new 
ClientHandler();
+        ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+        when(mockCtx.channel()).thenReturn(channel);
+        when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("127.0.0.1", 8091));
+
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        NettyClientChannelManager originalManager = 
(NettyClientChannelManager) channelManagerField.get(client);
+
+        NettyClientChannelManager spyManager = spy(originalManager);
+        channelManagerField.set(client, spyManager);
+
+        IdleStateEvent readerIdleEvent = 
IdleStateEvent.READER_IDLE_STATE_EVENT;
+        handler.userEventTriggered(mockCtx, readerIdleEvent);
+
+        Thread.sleep(500);
+        verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+        channelManagerField.set(client, originalManager);
+    }
+
+    @Test
+    void testChannelInactiveTriggersChannelRelease() throws Exception {
+        AbstractNettyRemotingClient.ClientHandler handler = client.new 
ClientHandler();
+        ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
+        when(mockCtx.channel()).thenReturn(channel);
+        when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("127.0.0.1", 8091));
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        NettyClientChannelManager originalManager = 
(NettyClientChannelManager) channelManagerField.get(client);
+
+        NettyClientChannelManager spyManager = spy(originalManager);
+        channelManagerField.set(client, spyManager);
+
+        handler.channelInactive(mockCtx);
+
+        Thread.sleep(500);
+
+        verify(spyManager).releaseChannel(eq(channel), eq("127.0.0.1:8091"));
+        channelManagerField.set(client, originalManager);
+    }
+
     private RpcMessage createRpcMessage(int id) {
         RpcMessage message = new RpcMessage();
         message.setId(id);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to