This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bb8a5f74356 Pipe: Fix the protential resource leak issue of async 
client when async manager is closed (#14929)
bb8a5f74356 is described below

commit bb8a5f743562addab8d8ba9dc2639ca91e142b65
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Feb 25 18:46:59 2025 +0800

    Pipe: Fix the protential resource leak issue of async client when async 
manager is closed (#14929)
---
 .../connector/client/IoTDBDataNodeAsyncClientManager.java  |  4 ++++
 .../thrift/async/IoTDBDataRegionAsyncConnector.java        |  1 +
 .../client/async/AsyncPipeDataTransferServiceClient.java   | 14 ++++++++++++++
 3 files changed, 19 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 39a03169c30..79c56fa6890 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -287,6 +288,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       if (exception.get() != null) {
         throw new PipeConnectionException("Failed to handshake.", 
exception.get());
       }
+    } catch (TException e) {
+      client.resetMethodStateIfStopped();
+      throw e;
     } finally {
       client.setShouldReturnSelf(true);
       client.returnSelf();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index c0682ae8f40..8ef6d3d9d6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -414,6 +414,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     if (client == null) {
       LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
     } else {
+      client.resetMethodStateIfStopped();
       LOGGER.warn(
           String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(), 
client.getPort()), e);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index f64c3f8d95d..06d9ecbe12f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -157,6 +157,20 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     LOGGER.info("Handshake finished for client {}", this);
   }
 
+  // To ensure that the socket will be closed eventually, we need to manually 
close the socket here,
+  // because the Client object may have thrown an exception before entering 
the asynchronous thread,
+  // and the returnSelf method may not be called, resulting in resource 
leakage.
+  public void resetMethodStateIfStopped() {
+    if (!___manager.isRunning()) {
+      if (___transport != null && ___transport.isOpen()) {
+        ___transport.close();
+        LOGGER.warn("Manually closing transport to prevent resource leakage.");
+      }
+      ___currentMethod = null;
+      LOGGER.info("Method state has been reset due to manager not running.");
+    }
+  }
+
   public String getIp() {
     return endpoint.getIp();
   }

Reply via email to