This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 27b3dcbb4d6 IoTV2: Use global singleton ClientManager to avoid too
many thrift threads which leads to Direct Memory OOM. (#15316)
27b3dcbb4d6 is described below
commit 27b3dcbb4d6752a61907de01299e7a91e12e38f4
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 11 21:08:50 2025 +0800
IoTV2: Use global singleton ClientManager to avoid too many thrift threads
which leads to Direct Memory OOM. (#15316)
* use one singleton global client manager for IoTV2
* refine selector thread
---
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 6 ++++--
.../pipeconsensus/PipeConsensusAsyncConnector.java | 2 +-
.../pipeconsensus/PipeConsensusSyncConnector.java | 2 +-
.../container/PipeConsensusClientMgrContainer.java | 20 +++++++++++++-------
4 files changed, 19 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 2046c38d61d..045b6db26ec 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -120,8 +120,10 @@ public class PipeConsensus implements IConsensus {
config.getPipeConsensusConfig().getReplicateMode());
this.consensusPipeGuardian =
config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
- this.asyncClientManager =
PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
- this.syncClientManager =
PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
+ this.asyncClientManager =
+
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+ this.syncClientManager =
+
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 224f6da1b62..49a9241c1e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -147,7 +147,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
nodeUrls, consensusGroupId, thisDataNodeId,
pipeConsensusConnectorMetrics);
retryConnector.customize(parameters, configuration);
asyncTransferClientManager =
- PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
+
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
if (isTabletBatchModeEnabled) {
tabletBatchBuilder =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 5365666a7a1..7be263d803b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -97,7 +97,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
this.consensusGroupId = consensusGroupId;
this.thisDataNodeId = thisDataNodeId;
this.syncRetryClientManager =
- PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
+
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 383b995104b..6ba0136280d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
public class PipeConsensusClientMgrContainer {
private static final CommonConfig CONF =
CommonDescriptor.getInstance().getConfig();
private final PipeConsensusClientProperty config;
+ private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
+ private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
private PipeConsensusClientMgrContainer() {
// load rpc client config
@@ -47,18 +49,22 @@ public class PipeConsensusClientMgrContainer {
PipeConsensusClientProperty.newBuilder()
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnabled())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
-
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
+ .setSelectorNumOfClientManager(Math.max(3,
CONF.getSelectorNumOfClientManager()))
.build();
+ this.asyncClientManager =
+ new IClientManager.Factory<TEndPoint,
AsyncPipeConsensusServiceClient>()
+ .createClientManager(new
AsyncPipeConsensusServiceClientPoolFactory(config));
+ this.syncClientManager =
+ new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
+ .createClientManager(new
SyncPipeConsensusServiceClientPoolFactory(config));
}
- public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
newAsyncClientManager() {
- return new IClientManager.Factory<TEndPoint,
AsyncPipeConsensusServiceClient>()
- .createClientManager(new
AsyncPipeConsensusServiceClientPoolFactory(config));
+ public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
getGlobalAsyncClientManager() {
+ return this.asyncClientManager;
}
- public IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
newSyncClientManager() {
- return new IClientManager.Factory<TEndPoint,
SyncPipeConsensusServiceClient>()
- .createClientManager(new
SyncPipeConsensusServiceClientPoolFactory(config));
+ public IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
getGlobalSyncClientManager() {
+ return this.syncClientManager;
}
private static class PipeConsensusClientMgrContainerHolder {