This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb8a5f74356 Pipe: Fix the protential resource leak issue of async
client when async manager is closed (#14929)
bb8a5f74356 is described below
commit bb8a5f743562addab8d8ba9dc2639ca91e142b65
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Feb 25 18:46:59 2025 +0800
Pipe: Fix the protential resource leak issue of async client when async
manager is closed (#14929)
---
.../connector/client/IoTDBDataNodeAsyncClientManager.java | 4 ++++
.../thrift/async/IoTDBDataRegionAsyncConnector.java | 1 +
.../client/async/AsyncPipeDataTransferServiceClient.java | 14 ++++++++++++++
3 files changed, 19 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 39a03169c30..79c56fa6890 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -287,6 +288,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
if (exception.get() != null) {
throw new PipeConnectionException("Failed to handshake.",
exception.get());
}
+ } catch (TException e) {
+ client.resetMethodStateIfStopped();
+ throw e;
} finally {
client.setShouldReturnSelf(true);
client.returnSelf();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index c0682ae8f40..8ef6d3d9d6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -414,6 +414,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (client == null) {
LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
} else {
+ client.resetMethodStateIfStopped();
LOGGER.warn(
String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(),
client.getPort()), e);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index f64c3f8d95d..06d9ecbe12f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -157,6 +157,20 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
LOGGER.info("Handshake finished for client {}", this);
}
+ // To ensure that the socket will be closed eventually, we need to manually
close the socket here,
+ // because the Client object may have thrown an exception before entering
the asynchronous thread,
+ // and the returnSelf method may not be called, resulting in resource
leakage.
+ public void resetMethodStateIfStopped() {
+ if (!___manager.isRunning()) {
+ if (___transport != null && ___transport.isOpen()) {
+ ___transport.close();
+ LOGGER.warn("Manually closing transport to prevent resource leakage.");
+ }
+ ___currentMethod = null;
+ LOGGER.info("Method state has been reset due to manager not running.");
+ }
+ }
+
public String getIp() {
return endpoint.getIp();
}