This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch better-internal-rpc-manage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7f0a3c8ed8fd5412fe919da001a801dfcb70d6fc
Author: Yongzao <[email protected]>
AuthorDate: Fri May 15 11:46:12 2026 +0800

    f
---
 .../async/AsyncAINodeHeartbeatClientPool.java      | 15 +++++++++++-
 .../async/AsyncConfigNodeHeartbeatClientPool.java  | 15 +++++++++++-
 .../async/AsyncDataNodeHeartbeatClientPool.java    | 28 ++++++++++++++++++++--
 .../client/sync/SyncConfigNodeClientPool.java      |  5 ++++
 .../client/request/AsyncRequestManager.java        | 16 +++++++++++--
 5 files changed, 73 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index f3e1d1064ef..d15b45cbcd5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
@@ -47,10 +48,22 @@ public class AsyncAINodeHeartbeatClientPool {
    */
   public void getAINodeHeartBeat(
       TEndPoint endPoint, TAIHeartbeatReq req, AINodeHeartbeatHandler handler) 
{
+    AsyncAINodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      clientManager.borrowClient(endPoint).getAIHeartbeat(req, handler);
+      client = clientManager.borrowClient(endPoint);
+      client.getAIHeartbeat(req, handler);
+      dispatched = true;
     } catch (Exception ignore) {
       // Just ignore
+    } finally {
+      // After the async call is dispatched, the client's onComplete/onError 
callback is
+      // responsible for returning the client. If the RPC was not dispatched 
(exception
+      // before/during the call), the client must be returned here to prevent 
pool leakage.
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, AsyncAINodeInternalServiceClient>) 
clientManager)
+            .returnClient(endPoint, client);
+      }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index a6dffbe0eef..5cf716d0dd1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
@@ -48,10 +49,22 @@ public class AsyncConfigNodeHeartbeatClientPool {
       TEndPoint endPoint,
       TConfigNodeHeartbeatReq heartbeatReq,
       ConfigNodeHeartbeatHandler handler) {
+    AsyncConfigNodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq, 
handler);
+      client = clientManager.borrowClient(endPoint);
+      client.getConfigNodeHeartBeat(heartbeatReq, handler);
+      dispatched = true;
     } catch (Exception ignore) {
       // Just ignore
+    } finally {
+      // After the async call is dispatched, the client's onComplete/onError 
callback is
+      // responsible for returning the client. If the RPC was not dispatched 
(exception
+      // before/during the call), the client must be returned here to prevent 
pool leakage.
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>) 
clientManager)
+            .returnClient(endPoint, client);
+      }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index f99a88ebc77..d31ec405e0c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -49,19 +50,42 @@ public class AsyncDataNodeHeartbeatClientPool {
    */
   public void getDataNodeHeartBeat(
       TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler 
handler) {
+    AsyncDataNodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
+      client = clientManager.borrowClient(endPoint);
+      client.getDataNodeHeartBeat(req, handler);
+      dispatched = true;
     } catch (Exception ignore) {
       // Just ignore
+    } finally {
+      returnClientIfNotDispatched(endPoint, client, dispatched);
     }
   }
 
   public void writeAuditLog(
       TEndPoint endPoint, TAuditLogReq req, DataNodeWriteAuditLogHandler 
handler) {
+    AsyncDataNodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      clientManager.borrowClient(endPoint).writeAuditLog(req, handler);
+      client = clientManager.borrowClient(endPoint);
+      client.writeAuditLog(req, handler);
+      dispatched = true;
     } catch (Exception e) {
       // Just ignore
+    } finally {
+      returnClientIfNotDispatched(endPoint, client, dispatched);
+    }
+  }
+
+  // After the async call is dispatched, the client's onComplete/onError 
callback is responsible
+  // for returning the client. If the RPC was not dispatched (exception 
before/during the call),
+  // the client must be returned here to prevent pool leakage.
+  private void returnClientIfNotDispatched(
+      TEndPoint endPoint, AsyncDataNodeInternalServiceClient client, boolean 
dispatched) {
+    if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+      ((ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>) 
clientManager)
+          .returnClient(endPoint, client);
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 96c9f775526..64024575260 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -137,6 +137,11 @@ public class SyncConfigNodeClientPool {
     while (status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
       TimeUnit.MILLISECONDS.sleep(2000);
       updateConfigNodeLeader(status);
+      if (configNodeLeader == null) {
+        LOGGER.warn(
+            "Redirection recommended for removeConfigNode but no leader 
endpoint provided, abort retry.");
+        break;
+      }
       try (SyncConfigNodeIServiceClient clientLeader =
           clientManager.borrowClient(configNodeLeader)) {
         status = clientLeader.removeConfigNode(configNodeLocation);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 5c069368e41..e1eeddbe970 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.client.request;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
@@ -177,6 +178,9 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       int requestId,
       NodeLocation targetNode,
       int retryCount) {
+    final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
+    Client client = null;
+    boolean dispatched = false;
     try {
       if (!actionMap.containsKey(requestContext.getRequestType())) {
         throw new UnsupportedOperationException(
@@ -184,20 +188,28 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
                 + requestContext.getRequestType()
                 + ", please set it in 
AsyncRequestManager::initActionMapBuilder()");
       }
-      Client client = 
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+      client = clientManager.borrowClient(endPoint);
       adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
       Object req = requestContext.getRequest(requestId);
       AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
           buildHandler(requestContext, requestId, targetNode);
       Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
           .accept(req, client, handler);
+      // After accept() returns, the async callback (onComplete/onError) takes 
over the
+      // responsibility of returning the client to the pool. Before this 
point, if any exception
+      // is thrown, the client must be returned/invalidated here to prevent 
pool leakage.
+      dispatched = true;
     } catch (Exception e) {
       LOGGER.warn(
           "{} failed on Node {}, because {}, retrying {}...",
           requestContext.getRequestType(),
-          nodeLocationToEndPoint(targetNode),
+          endPoint,
           e.getMessage(),
           retryCount);
+    } finally {
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, Client>) 
clientManager).returnClient(endPoint, client);
+      }
     }
   }
 

Reply via email to