This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 27b3dcbb4d6 IoTV2: Use global singleton ClientManager to avoid too 
many thrift threads which leads to Direct Memory OOM. (#15316)
27b3dcbb4d6 is described below

commit 27b3dcbb4d6752a61907de01299e7a91e12e38f4
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 11 21:08:50 2025 +0800

    IoTV2: Use global singleton ClientManager to avoid too many thrift threads 
which leads to Direct Memory OOM. (#15316)
    
    * use one singleton global client manager for IoTV2
    
    * refine selector thread
---
 .../apache/iotdb/consensus/pipe/PipeConsensus.java   |  6 ++++--
 .../pipeconsensus/PipeConsensusAsyncConnector.java   |  2 +-
 .../pipeconsensus/PipeConsensusSyncConnector.java    |  2 +-
 .../container/PipeConsensusClientMgrContainer.java   | 20 +++++++++++++-------
 4 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 2046c38d61d..045b6db26ec 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -120,8 +120,10 @@ public class PipeConsensus implements IConsensus {
             config.getPipeConsensusConfig().getReplicateMode());
     this.consensusPipeGuardian =
         config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
-    this.asyncClientManager = 
PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
-    this.syncClientManager = 
PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
+    this.asyncClientManager =
+        
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+    this.syncClientManager =
+        
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 224f6da1b62..49a9241c1e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -147,7 +147,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
             nodeUrls, consensusGroupId, thisDataNodeId, 
pipeConsensusConnectorMetrics);
     retryConnector.customize(parameters, configuration);
     asyncTransferClientManager =
-        PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
+        
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 5365666a7a1..7be263d803b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -97,7 +97,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     this.consensusGroupId = consensusGroupId;
     this.thisDataNodeId = thisDataNodeId;
     this.syncRetryClientManager =
-        PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
+        
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
     this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 383b995104b..6ba0136280d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 public class PipeConsensusClientMgrContainer {
   private static final CommonConfig CONF = 
CommonDescriptor.getInstance().getConfig();
   private final PipeConsensusClientProperty config;
+  private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
+  private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
 
   private PipeConsensusClientMgrContainer() {
     // load rpc client config
@@ -47,18 +49,22 @@ public class PipeConsensusClientMgrContainer {
         PipeConsensusClientProperty.newBuilder()
             
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnabled())
             .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
-            
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
+            .setSelectorNumOfClientManager(Math.max(3, 
CONF.getSelectorNumOfClientManager()))
             .build();
+    this.asyncClientManager =
+        new IClientManager.Factory<TEndPoint, 
AsyncPipeConsensusServiceClient>()
+            .createClientManager(new 
AsyncPipeConsensusServiceClientPoolFactory(config));
+    this.syncClientManager =
+        new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
+            .createClientManager(new 
SyncPipeConsensusServiceClientPoolFactory(config));
   }
 
-  public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
newAsyncClientManager() {
-    return new IClientManager.Factory<TEndPoint, 
AsyncPipeConsensusServiceClient>()
-        .createClientManager(new 
AsyncPipeConsensusServiceClientPoolFactory(config));
+  public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
getGlobalAsyncClientManager() {
+    return this.asyncClientManager;
   }
 
-  public IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
newSyncClientManager() {
-    return new IClientManager.Factory<TEndPoint, 
SyncPipeConsensusServiceClient>()
-        .createClientManager(new 
SyncPipeConsensusServiceClientPoolFactory(config));
+  public IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
getGlobalSyncClientManager() {
+    return this.syncClientManager;
   }
 
   private static class PipeConsensusClientMgrContainerHolder {

Reply via email to