This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7acfe4dca1 [ISSUE #9737] Fix client memory leak on connection failure 
(#9740)
7acfe4dca1 is described below

commit 7acfe4dca1ac38c79fbe78fc7e6f0e76fbeb21df
Author: qianye <[email protected]>
AuthorDate: Wed Nov 19 16:22:07 2025 +0800

    [ISSUE #9737] Fix client memory leak on connection failure (#9740)
---
 .../remoting/netty/NettyRemotingClient.java        | 34 ++++++++++++----------
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 973d229bef..7ed977d99c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -40,6 +40,7 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.resolver.NoopAddressResolverGroup;
+import io.netty.util.AttributeKey;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
@@ -96,6 +97,9 @@ import static 
org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelF
 public class NettyRemotingClient extends NettyRemotingAbstract implements 
RemotingClient {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
 
+    private static final AttributeKey<ChannelWrapper> 
CHANNEL_WRAPPER_ATTRIBUTE_KEY = AttributeKey.valueOf(
+        "channelWrapper");
+
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
 
@@ -106,7 +110,6 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
     private final Map<String /* cidr */, SocksProxyConfig /* proxy */> 
proxyMap = new HashMap<>();
     private final ConcurrentHashMap<String /* cidr */, Bootstrap> bootstrapMap 
= new ConcurrentHashMap<>();
     private final ConcurrentMap<String /* addr */, ChannelWrapper> 
channelTables = new ConcurrentHashMap<>();
-    private final ConcurrentMap<Channel, ChannelWrapper> channelWrapperTables 
= new ConcurrentHashMap<>();
 
     private final HashedWheelTimer timer = new HashedWheelTimer(r -> new 
Thread(r, "ClientHouseKeepingService"));
 
@@ -381,7 +384,6 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 channel.getValue().close();
             }
 
-            this.channelWrapperTables.clear();
             this.channelTables.clear();
 
             this.eventLoopGroupWorker.shutdownGracefully();
@@ -439,7 +441,8 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     }
 
                     if (removeItemFromTable) {
-                        ChannelWrapper channelWrapper = 
this.channelWrapperTables.remove(channel);
+                        ChannelWrapper channelWrapper =
+                            
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
                         if (channelWrapper != null && 
channelWrapper.tryClose(channel)) {
                             this.channelTables.remove(addrRemote);
                         }
@@ -487,7 +490,8 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     }
 
                     if (removeItemFromTable) {
-                        ChannelWrapper channelWrapper = 
this.channelWrapperTables.remove(channel);
+                        ChannelWrapper channelWrapper =
+                            
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
                         if (channelWrapper != null && 
channelWrapper.tryClose(channel)) {
                             this.channelTables.remove(addrRemote);
                         }
@@ -724,7 +728,6 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         LOGGER.info("createChannel: begin to connect remote host[{}] 
asynchronously", addr);
         ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
         this.channelTables.put(addr, cw);
-        this.channelWrapperTables.put(channelFuture.channel(), cw);
         return cw;
     }
 
@@ -831,17 +834,12 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             if (response.getCode() == ResponseCode.GO_AWAY) {
                 if (nettyClientConfig.isEnableReconnectForGoAway()) {
                     LOGGER.info("Receive go away from channelId={}, 
channel={}", channel.id(), channel);
-                    ChannelWrapper channelWrapper = 
channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
-                        try {
-                            if (channelWrapper0.reconnect(channel0)) {
-                                LOGGER.info("Receive go away from 
channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, 
channelWrapper0.getChannel().id());
-                                
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
-                            }
-                        } catch (Throwable t) {
-                            LOGGER.error("Channel {} reconnect error", 
channelWrapper0, t);
-                        }
-                        return channelWrapper0;
-                    });
+                    ChannelWrapper channelWrapper = 
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY,
+                        channel);
+                    if (channelWrapper != null && 
channelWrapper.reconnect(channel)) {
+                        LOGGER.info("Receive go away from channelId={}, 
channel={}, recreate the channelId={}",
+                            channel.id(), channel, 
channelWrapper.getChannel().id());
+                    }
                     if (channelWrapper != null && 
!channelWrapper.isWrapperOf(channel)) {
                         RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
                         retryRequest.setBody(request.getBody());
@@ -1006,6 +1004,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             this.channelFuture = channelFuture;
             this.lastResponseTime = System.currentTimeMillis();
             this.channelAddress = address;
+            RemotingHelper.setPropertyToAttr(channelFuture.channel(), 
CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
         }
 
         public boolean isOK() {
@@ -1055,10 +1054,13 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     if (isWrapperOf(channel)) {
                         channelToClose = channelFuture;
                         channelFuture = doConnect(channelAddress);
+                        
RemotingHelper.setPropertyToAttr(channelFuture.channel(), 
CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
                         return true;
                     } else {
                         LOGGER.warn("channelWrapper has reconnect, so do 
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
                     }
+                } catch (Throwable t) {
+                    LOGGER.error("ChannelWrapper {} reconnect error", this, t);
                 } finally {
                     lock.writeLock().unlock();
                 }

Reply via email to