This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 83c8231eea6 Pipe: Fix connection leak caused by clients not closed
after task dropped (2 situations) (#15910) (#15929)
83c8231eea6 is described below
commit 83c8231eea6c8a96dfc9ff1bee92cd55d950a367
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jul 15 18:19:03 2025 +0800
Pipe: Fix connection leak caused by clients not closed after task dropped
(2 situations) (#15910) (#15929)
(cherry picked from commit bf8329b15179bead9bb50c731274fae3dad816e9)
---
.../protocol/IoTDBConfigRegionConnector.java | 8 ++++----
.../subtask/connector/PipeConnectorSubtask.java | 5 +++++
.../client/IoTDBDataNodeAsyncClientManager.java | 20 +++++++++++++++++++-
.../async/IoTDBDataRegionAsyncConnector.java | 4 +++-
.../handler/PipeTransferTrackableHandler.java | 2 ++
.../async/handler/PipeTransferTsFileHandler.java | 22 ++++++++++++++++++----
.../apache/iotdb/commons/client/ClientManager.java | 22 +++++++++++++---------
.../async/AsyncPipeDataTransferServiceClient.java | 2 +-
.../connector/protocol/IoTDBSslSyncConnector.java | 14 +++++++++++---
9 files changed, 76 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 3761f696875..0a72d303fba 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -138,7 +138,7 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
private void doTransfer(final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
throws PipeException {
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
getClientManager().getClient();
final TPipeTransferResp resp;
try {
@@ -164,7 +164,7 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final TSStatus status = resp.getStatus();
// Send handshake req and then re-transfer the event
if (status.getCode() ==
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
- clientManager.sendHandshakeReq(clientAndStatus);
+ getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -203,7 +203,7 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final long creationTime = snapshotEvent.getCreationTime();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
getClientManager().getClient();
// 1. Transfer snapshotFile, and template File if exists
transferFilePieces(
@@ -250,7 +250,7 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final TSStatus status = resp.getStatus();
// Send handshake req and then re-transfer the event
if (status.getCode() ==
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
- clientManager.sendHandshakeReq(clientAndStatus);
+ getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5d8d622a74a..2415ea6e36e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -164,6 +164,11 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
}
private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
+ // DO NOT call heartbeat or transfer after closed, or will cause
connection leak
+ if (isClosed.get()) {
+ return;
+ }
+
try {
outputPipeConnector.heartbeat();
outputPipeConnector.transfer(event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 2fc41a61866..e3794191683 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -319,6 +319,18 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
client.resetMethodStateIfStopped();
throw e;
} finally {
+ if (isClosed) {
+ try {
+ client.close();
+ client.invalidateAll();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to close client {}:{} after handshake failure when the
manager is closed.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ e);
+ }
+ }
client.setShouldReturnSelf(true);
client.returnSelf();
}
@@ -372,8 +384,14 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
if (clientManager != null) {
try {
clientManager.close();
+ LOGGER.info(
+ "Closed AsyncPipeDataTransferServiceClientManager for
receiver attributes: {}",
+ receiverAttributes);
} catch (final Exception e) {
- LOGGER.warn("Failed to close client manager.", e);
+ LOGGER.warn(
+ "Failed to close
AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
+ receiverAttributes,
+ e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 0dacf0cb912..c204724e7ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -180,7 +180,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public void heartbeat() throws Exception {
- syncConnector.heartbeat();
+ if (!isClosed()) {
+ syncConnector.heartbeat();
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 4908adba8c6..e78cd9adbf3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -82,6 +82,8 @@ public abstract class PipeTransferTrackableHandler
if (connector.isClosed()) {
clearEventsReferenceCount();
connector.eliminateHandler(this);
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
return false;
}
doTransfer(client, req);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 7353ea91e91..4d2bf68fe02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -402,11 +402,25 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
private void returnClientIfNecessary() {
- if (client != null) {
- client.setShouldReturnSelf(true);
- client.returnSelf();
- client = null;
+ if (client == null) {
+ return;
+ }
+
+ if (connector.isClosed()) {
+ try {
+ client.close();
+ client.invalidateAll();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
+ client,
+ e.getMessage(),
+ e);
+ }
}
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ client = null;
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 56bc67b6399..b4a16ea437b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -64,15 +64,19 @@ public class ClientManager<K, V> implements
IClientManager<K, V> {
* return of a client is automatic whenever a particular client is used.
*/
public void returnClient(K node, V client) {
- Optional.ofNullable(node)
- .ifPresent(
- x -> {
- try {
- pool.returnObject(node, client);
- } catch (Exception e) {
- LOGGER.warn("Return client {} for node {} to pool failed.",
client, node, e);
- }
- });
+ if (node != null) {
+ try {
+ pool.returnObject(node, client);
+ } catch (Exception e) {
+ LOGGER.warn("Return client {} for node {} to pool failed.", client,
node, e);
+ }
+ } else if (client instanceof ThriftClient) {
+ ((ThriftClient) client).invalidateAll();
+ LOGGER.warn(
+ "Return client {} to pool failed because the node is null. "
+ + "This may cause resource leak, please check your code.",
+ client);
+ }
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 06d9ecbe12f..f8b2bfb08c4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -127,7 +127,7 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
}
}
- private void close() {
+ public void close() {
___transport.close();
___currentMethod = null;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 51e532d737d..69d73e070e2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -62,7 +62,14 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
- protected IoTDBSyncClientManager clientManager;
+ private volatile IoTDBSyncClientManager clientManager;
+
+ protected IoTDBSyncClientManager getClientManager() {
+ if (clientManager == null) {
+ throw new IllegalStateException("IoTDB sync client manager has been
closed");
+ }
+ return clientManager;
+ }
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -147,7 +154,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
@Override
public void handshake() throws Exception {
- clientManager.checkClientStatusAndTryReconstructIfNecessary();
+ getClientManager().checkClientStatusAndTryReconstructIfNecessary();
}
@Override
@@ -222,7 +229,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
// Send handshake req and then re-transfer the event
if (status.getCode()
==
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
- clientManager.sendHandshakeReq(clientAndStatus);
+ getClientManager().sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -246,6 +253,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
public void close() {
if (clientManager != null) {
clientManager.close();
+ clientManager = null;
}
super.close();