This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch worktree-rpc-thread-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/worktree-rpc-thread-opt by
this push:
new 98c89bbde5f Use node-specific selectorNum in
DataNodeInternalServiceRequestManager
98c89bbde5f is described below
commit 98c89bbde5f67c2acf692a3decc12327dceb4058
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Apr 27 10:31:56 2026 +0800
Use node-specific selectorNum in DataNodeInternalServiceRequestManager
Add selectorNumOfAsyncClientManager field to
DataNodeInternalServiceRequestManager
so each subclass can supply the correct selector thread count:
- CnToDnInternalServiceAsyncRequestManager reads from ConfigNodeConfig
- DnToDnInternalServiceAsyncRequestManager reads from IoTDBConfig
Also add selectorNumOfClientManager to ConfigNodeConfig and load
cn_selector_thread_nums_of_client_manager in ConfigNodeDescriptor.
---
.../async/CnToDnInternalServiceAsyncRequestManager.java | 5 +++++
.../org/apache/iotdb/confignode/conf/ConfigNodeConfig.java | 14 ++++++++++++++
.../apache/iotdb/confignode/conf/ConfigNodeDescriptor.java | 6 ++++++
.../dn/DnToDnInternalServiceAsyncRequestManager.java | 5 +++++
.../request/DataNodeInternalServiceRequestManager.java | 10 +++++++++-
5 files changed, 39 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index cd69f8b2c84..8c7b389bd3d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewField
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq;
@@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
+ private CnToDnInternalServiceAsyncRequestManager() {
+
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
+ }
+
@SuppressWarnings("unchecked")
@Override
protected void initActionMapBuilder() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3f92340d431..bae8b96293a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -145,6 +145,12 @@ public class ConfigNodeConfig {
*/
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ /**
+ * ClientManager will have so many selector threads (TAsyncClientManager) to
distribute to its
+ * clients.
+ */
+ private int selectorNumOfClientManager = 1;
+
/** System directory, including version file for each database and metadata.
*/
private String systemDir =
IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -462,6 +468,14 @@ public class ConfigNodeConfig {
return this;
}
+ public int getSelectorNumOfClientManager() {
+ return selectorNumOfClientManager;
+ }
+
+ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
+ this.selectorNumOfClientManager = selectorNumOfClientManager;
+ }
+
public String getConsensusDir() {
return consensusDir;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index dd32415ebe0..551f24f8e1a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -271,6 +271,12 @@ public class ConfigNodeDescriptor {
"cn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))));
+ conf.setSelectorNumOfClientManager(
+ Integer.parseInt(
+ properties.getProperty(
+ "cn_selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager()))));
+
conf.setSystemDir(properties.getProperty("cn_system_dir",
conf.getSystemDir()));
conf.setConsensusDir(properties.getProperty("cn_consensus_dir",
conf.getConsensusDir()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
index b57a10bcb43..29dfbed203f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TAttributeUpdateReq;
import org.slf4j.Logger;
@@ -33,6 +34,10 @@ public class DnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class);
+ private DnToDnInternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
index fcb1b01857d..6e559adad82 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
@@ -28,12 +28,20 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
public abstract class DataNodeInternalServiceRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+ private final int selectorNumOfAsyncClientManager;
+
+ protected DataNodeInternalServiceRequestManager(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
protected void initClientManager() {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override