This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch worktree-rpc-thread-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/worktree-rpc-thread-opt by 
this push:
     new 98c89bbde5f Use node-specific selectorNum in 
DataNodeInternalServiceRequestManager
98c89bbde5f is described below

commit 98c89bbde5f67c2acf692a3decc12327dceb4058
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Apr 27 10:31:56 2026 +0800

    Use node-specific selectorNum in DataNodeInternalServiceRequestManager
    
    Add selectorNumOfAsyncClientManager field to 
DataNodeInternalServiceRequestManager
    so each subclass can supply the correct selector thread count:
    - CnToDnInternalServiceAsyncRequestManager reads from ConfigNodeConfig
    - DnToDnInternalServiceAsyncRequestManager reads from IoTDBConfig
    
    Also add selectorNumOfClientManager to ConfigNodeConfig and load
    cn_selector_thread_nums_of_client_manager in ConfigNodeDescriptor.
---
 .../async/CnToDnInternalServiceAsyncRequestManager.java    |  5 +++++
 .../org/apache/iotdb/confignode/conf/ConfigNodeConfig.java | 14 ++++++++++++++
 .../apache/iotdb/confignode/conf/ConfigNodeDescriptor.java |  6 ++++++
 .../dn/DnToDnInternalServiceAsyncRequestManager.java       |  5 +++++
 .../request/DataNodeInternalServiceRequestManager.java     | 10 +++++++++-
 5 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index cd69f8b2c84..8c7b389bd3d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewField
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
 import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq;
@@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager
   private static final Logger LOGGER =
       LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
 
+  private CnToDnInternalServiceAsyncRequestManager() {
+    
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   protected void initActionMapBuilder() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3f92340d431..bae8b96293a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -145,6 +145,12 @@ public class ConfigNodeConfig {
    */
   private int maxClientNumForEachNode = 
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
 
+  /**
+   * ClientManager will have so many selector threads (TAsyncClientManager) to 
distribute to its
+   * clients.
+   */
+  private int selectorNumOfClientManager = 1;
+
   /** System directory, including version file for each database and metadata. 
*/
   private String systemDir =
       IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + 
IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -462,6 +468,14 @@ public class ConfigNodeConfig {
     return this;
   }
 
+  public int getSelectorNumOfClientManager() {
+    return selectorNumOfClientManager;
+  }
+
+  public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
+    this.selectorNumOfClientManager = selectorNumOfClientManager;
+  }
+
   public String getConsensusDir() {
     return consensusDir;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index dd32415ebe0..551f24f8e1a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -271,6 +271,12 @@ public class ConfigNodeDescriptor {
                 "cn_max_client_count_for_each_node_in_client_manager",
                 String.valueOf(conf.getMaxClientNumForEachNode()))));
 
+    conf.setSelectorNumOfClientManager(
+        Integer.parseInt(
+            properties.getProperty(
+                "cn_selector_thread_nums_of_client_manager",
+                String.valueOf(conf.getSelectorNumOfClientManager()))));
+
     conf.setSystemDir(properties.getProperty("cn_system_dir", 
conf.getSystemDir()));
 
     conf.setConsensusDir(properties.getProperty("cn_consensus_dir", 
conf.getConsensusDir()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
index b57a10bcb43..29dfbed203f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
 import 
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.mpp.rpc.thrift.TAttributeUpdateReq;
 
 import org.slf4j.Logger;
@@ -33,6 +34,10 @@ public class DnToDnInternalServiceAsyncRequestManager
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class);
 
+  private DnToDnInternalServiceAsyncRequestManager() {
+    
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+  }
+
   @Override
   protected void initActionMapBuilder() {
     actionMapBuilder.put(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
index fcb1b01857d..6e559adad82 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
@@ -28,12 +28,20 @@ import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 public abstract class DataNodeInternalServiceRequestManager<RequestType>
     extends AsyncRequestManager<
         RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+  private final int selectorNumOfAsyncClientManager;
+
+  protected DataNodeInternalServiceRequestManager(int 
selectorNumOfAsyncClientManager) {
+    this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+  }
+
   @Override
   protected void initClientManager() {
     clientManager =
         new IClientManager.Factory<TEndPoint, 
AsyncDataNodeInternalServiceClient>()
             .createClientManager(
-                new 
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+                new 
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+                    selectorNumOfAsyncClientManager));
   }
 
   @Override

Reply via email to