This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 83c8231eea6 Pipe: Fix connection leak caused by clients not closed 
after task dropped (2 situations) (#15910) (#15929)
83c8231eea6 is described below

commit 83c8231eea6c8a96dfc9ff1bee92cd55d950a367
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jul 15 18:19:03 2025 +0800

    Pipe: Fix connection leak caused by clients not closed after task dropped 
(2 situations) (#15910) (#15929)
    
    (cherry picked from commit bf8329b15179bead9bb50c731274fae3dad816e9)
---
 .../protocol/IoTDBConfigRegionConnector.java       |  8 ++++----
 .../subtask/connector/PipeConnectorSubtask.java    |  5 +++++
 .../client/IoTDBDataNodeAsyncClientManager.java    | 20 +++++++++++++++++++-
 .../async/IoTDBDataRegionAsyncConnector.java       |  4 +++-
 .../handler/PipeTransferTrackableHandler.java      |  2 ++
 .../async/handler/PipeTransferTsFileHandler.java   | 22 ++++++++++++++++++----
 .../apache/iotdb/commons/client/ClientManager.java | 22 +++++++++++++---------
 .../async/AsyncPipeDataTransferServiceClient.java  |  2 +-
 .../connector/protocol/IoTDBSslSyncConnector.java  | 14 +++++++++++---
 9 files changed, 76 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 3761f696875..0a72d303fba 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -138,7 +138,7 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
 
   private void doTransfer(final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
       throws PipeException {
-    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
+    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
getClientManager().getClient();
 
     final TPipeTransferResp resp;
     try {
@@ -164,7 +164,7 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
     final TSStatus status = resp.getStatus();
     // Send handshake req and then re-transfer the event
     if (status.getCode() == 
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
-      clientManager.sendHandshakeReq(clientAndStatus);
+      getClientManager().sendHandshakeReq(clientAndStatus);
     }
     // Only handle the failed statuses to avoid string format performance 
overhead
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -203,7 +203,7 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
     final long creationTime = snapshotEvent.getCreationTime();
     final File snapshotFile = snapshotEvent.getSnapshotFile();
     final File templateFile = snapshotEvent.getTemplateFile();
-    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
+    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
getClientManager().getClient();
 
     // 1. Transfer snapshotFile, and template File if exists
     transferFilePieces(
@@ -250,7 +250,7 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
     final TSStatus status = resp.getStatus();
     // Send handshake req and then re-transfer the event
     if (status.getCode() == 
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
-      clientManager.sendHandshakeReq(clientAndStatus);
+      getClientManager().sendHandshakeReq(clientAndStatus);
     }
     // Only handle the failed statuses to avoid string format performance 
overhead
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5d8d622a74a..2415ea6e36e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -164,6 +164,11 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
   }
 
   private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
+    // DO NOT call heartbeat or transfer after closed, or will cause 
connection leak
+    if (isClosed.get()) {
+      return;
+    }
+
     try {
       outputPipeConnector.heartbeat();
       outputPipeConnector.transfer(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 2fc41a61866..e3794191683 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -319,6 +319,18 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       client.resetMethodStateIfStopped();
       throw e;
     } finally {
+      if (isClosed) {
+        try {
+          client.close();
+          client.invalidateAll();
+        } catch (final Exception e) {
+          LOGGER.warn(
+              "Failed to close client {}:{} after handshake failure when the 
manager is closed.",
+              targetNodeUrl.getIp(),
+              targetNodeUrl.getPort(),
+              e);
+        }
+      }
       client.setShouldReturnSelf(true);
       client.returnSelf();
     }
@@ -372,8 +384,14 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
               if (clientManager != null) {
                 try {
                   clientManager.close();
+                  LOGGER.info(
+                      "Closed AsyncPipeDataTransferServiceClientManager for 
receiver attributes: {}",
+                      receiverAttributes);
                 } catch (final Exception e) {
-                  LOGGER.warn("Failed to close client manager.", e);
+                  LOGGER.warn(
+                      "Failed to close 
AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
+                      receiverAttributes,
+                      e);
                 }
               }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 0dacf0cb912..c204724e7ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -180,7 +180,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public void heartbeat() throws Exception {
-    syncConnector.heartbeat();
+    if (!isClosed()) {
+      syncConnector.heartbeat();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 4908adba8c6..e78cd9adbf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -82,6 +82,8 @@ public abstract class PipeTransferTrackableHandler
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this);
+      client.setShouldReturnSelf(true);
+      client.returnSelf();
       return false;
     }
     doTransfer(client, req);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 7353ea91e91..4d2bf68fe02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -402,11 +402,25 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   }
 
   private void returnClientIfNecessary() {
-    if (client != null) {
-      client.setShouldReturnSelf(true);
-      client.returnSelf();
-      client = null;
+    if (client == null) {
+      return;
+    }
+
+    if (connector.isClosed()) {
+      try {
+        client.close();
+        client.invalidateAll();
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
+            client,
+            e.getMessage(),
+            e);
+      }
     }
+    client.setShouldReturnSelf(true);
+    client.returnSelf();
+    client = null;
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 56bc67b6399..b4a16ea437b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -64,15 +64,19 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
    * return of a client is automatic whenever a particular client is used.
    */
   public void returnClient(K node, V client) {
-    Optional.ofNullable(node)
-        .ifPresent(
-            x -> {
-              try {
-                pool.returnObject(node, client);
-              } catch (Exception e) {
-                LOGGER.warn("Return client {} for node {} to pool failed.", 
client, node, e);
-              }
-            });
+    if (node != null) {
+      try {
+        pool.returnObject(node, client);
+      } catch (Exception e) {
+        LOGGER.warn("Return client {} for node {} to pool failed.", client, 
node, e);
+      }
+    } else if (client instanceof ThriftClient) {
+      ((ThriftClient) client).invalidateAll();
+      LOGGER.warn(
+          "Return client {} to pool failed because the node is null. "
+              + "This may cause resource leak, please check your code.",
+          client);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 06d9ecbe12f..f8b2bfb08c4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -127,7 +127,7 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     }
   }
 
-  private void close() {
+  public void close() {
     ___transport.close();
     ___currentMethod = null;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 51e532d737d..69d73e070e2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -62,7 +62,14 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
 
-  protected IoTDBSyncClientManager clientManager;
+  private volatile IoTDBSyncClientManager clientManager;
+
+  protected IoTDBSyncClientManager getClientManager() {
+    if (clientManager == null) {
+      throw new IllegalStateException("IoTDB sync client manager has been 
closed");
+    }
+    return clientManager;
+  }
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -147,7 +154,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   @Override
   public void handshake() throws Exception {
-    clientManager.checkClientStatusAndTryReconstructIfNecessary();
+    getClientManager().checkClientStatusAndTryReconstructIfNecessary();
   }
 
   @Override
@@ -222,7 +229,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
         // Send handshake req and then re-transfer the event
         if (status.getCode()
             == 
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
-          clientManager.sendHandshakeReq(clientAndStatus);
+          getClientManager().sendHandshakeReq(clientAndStatus);
         }
         // Only handle the failed statuses to avoid string format performance 
overhead
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -246,6 +253,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
   public void close() {
     if (clientManager != null) {
       clientManager.close();
+      clientManager = null;
     }
 
     super.close();

Reply via email to