This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch client_num in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 06c54029b219ac05ec38d06e213d4cd93db37dc9 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Apr 8 12:30:59 2025 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../consensus/config/PipeConsensusConfig.java | 28 ---------------------- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 --------- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 ---------- .../db/consensus/DataRegionConsensusImpl.java | 4 ---- .../conf/iotdb-system.properties.template | 5 ---- .../service/AbstractThriftServiceThread.java | 20 ++++------------ .../iotdb/commons/service/ThriftServiceThread.java | 2 -- 7 files changed, 4 insertions(+), 78 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index eb6ea361d24..f0366cf0087 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -81,8 +81,6 @@ public class PipeConsensusConfig { } public static class RPC { - private final int rpcSelectorThreadNum; - private final int rpcMinConcurrentClientNum; private final int rpcMaxConcurrentClientNum; private final int thriftServerAwaitTimeForStopService; private final boolean isRpcThriftCompressionEnabled; @@ -90,15 +88,11 @@ public class PipeConsensusConfig { private final int thriftMaxFrameSize; public RPC( - int rpcSelectorThreadNum, - int rpcMinConcurrentClientNum, int rpcMaxConcurrentClientNum, int thriftServerAwaitTimeForStopService, boolean isRpcThriftCompressionEnabled, int connectionTimeoutInMs, int thriftMaxFrameSize) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum; this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService; this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled; @@ -106,14 +100,6 @@ public class PipeConsensusConfig { this.thriftMaxFrameSize = thriftMaxFrameSize; } - public int getRpcSelectorThreadNum() { - return rpcSelectorThreadNum; - } - - public int getRpcMinConcurrentClientNum() { - return rpcMinConcurrentClientNum; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -139,24 +125,12 @@ public class PipeConsensusConfig { } public static class Builder { - private int rpcSelectorThreadNum = 1; - private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors(); private int rpcMaxConcurrentClientNum = 65535; private int thriftServerAwaitTimeForStopService = 60; private boolean isRpcThriftCompressionEnabled = false; private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60); private int thriftMaxFrameSize = 536870912; - public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) { - this.rpcSelectorThreadNum = rpcSelectorThreadNum; - return this; - } - - public RPC.Builder setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) { - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; - return this; - } - public RPC.Builder setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) { this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum; return this; @@ -185,8 +159,6 @@ public class PipeConsensusConfig { public RPC build() { return new RPC( - rpcSelectorThreadNum, - rpcMinConcurrentClientNum, rpcMaxConcurrentClientNum, thriftServerAwaitTimeForStopService, isRpcThriftCompressionEnabled, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 3e27d24b783..dfceb0c6021 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -145,9 +145,6 @@ public class IoTDBConfig { /** Rpc Selector thread num */ private int rpcSelectorThreadCount = 1; - /** Min concurrent client number */ - private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors(); - /** Max concurrent client number */ private int rpcMaxConcurrentClientNum = 1000; @@ -1770,14 +1767,6 @@ public class IoTDBConfig { this.rpcSelectorThreadCount = rpcSelectorThreadCount; } - public int getRpcMinConcurrentClientNum() { - return rpcMinConcurrentClientNum; - } - - public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) { - this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index c2d6f90e5ec..b4b2dbb50c8 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -728,18 +728,6 @@ public class IoTDBDescriptor { conf.setRpcSelectorThreadCount(rpcSelectorThreadNum); - int minConcurrentClientNum = - Integer.parseInt( - properties.getProperty( - "dn_rpc_min_concurrent_client_num", - Integer.toString(conf.getRpcMinConcurrentClientNum()))); - - if (minConcurrentClientNum <= 0) { - minConcurrentClientNum = Runtime.getRuntime().availableProcessors(); - } - - conf.setRpcMinConcurrentClientNum(minConcurrentClientNum); - int maxConcurrentClientNum = Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 38ce6a0f158..0f71a1cece8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -129,8 +129,6 @@ public class DataRegionConsensusImpl { .setRpc( RPC.newBuilder() .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS()) - .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount()) - .setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum()) .setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum()) .setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable()) .setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager()) @@ -157,8 +155,6 @@ public class DataRegionConsensusImpl { PipeConsensusConfig.RPC .newBuilder() .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS()) - .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount()) - .setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum()) .setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum()) .setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable()) .setThriftServerAwaitTimeForStopService( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 2fa2c51e3d4..8d40a28d8af 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -502,11 +502,6 @@ dn_rpc_advanced_compression_enable=false # Datatype: int dn_rpc_selector_thread_count=1 -# The min number of concurrent clients that can be connected to the dataNode. -# effectiveMode: restart -# Datatype: int -dn_rpc_min_concurrent_client_num=1 - # The maximum number of concurrent clients that can be connected to the dataNode. # effectiveMode: restart # Datatype: int diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index 6d0a871c9df..2afc66cfe7d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -113,7 +113,6 @@ public abstract class AbstractThriftServiceThread extends Thread { String bindAddress, int port, int selectorThreads, - int minWorkerThreads, int maxWorkerThreads, int timeoutSecond, TServerEventHandler serverEventHandler, @@ -134,7 +133,6 @@ public abstract class AbstractThriftServiceThread extends Thread { processor, threadsName, selectorThreads, - minWorkerThreads, maxWorkerThreads, timeoutSecond, maxReadBufferBytes); @@ -143,12 +141,7 @@ public abstract class AbstractThriftServiceThread extends Thread { case HSHA: THsHaServer.Args poolArgs1 = initAsyncedHshaPoolArgs( - processor, - threadsName, - minWorkerThreads, - maxWorkerThreads, - timeoutSecond, - maxReadBufferBytes); + processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes); poolServer = new THsHaServer(poolArgs1); break; default: @@ -228,10 +221,7 @@ public abstract class AbstractThriftServiceThread extends Thread { private TThreadPoolServer.Args initSyncedPoolArgs( TProcessor processor, String threadsName, int maxWorkerThreads, int timeoutSecond) { TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport); - poolArgs - .maxWorkerThreads(maxWorkerThreads) - .minWorkerThreads(Runtime.getRuntime().availableProcessors()) - .stopTimeoutVal(timeoutSecond); + poolArgs.maxWorkerThreads(maxWorkerThreads).minWorkerThreads(0).stopTimeoutVal(timeoutSecond); executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName); poolArgs.executorService = executorService; poolArgs.processor(processor); @@ -244,7 +234,6 @@ public abstract class AbstractThriftServiceThread extends Thread { TBaseAsyncProcessor<?> processor, String threadsName, int selectorThreads, - int minWorkerThreads, int maxWorkerThreads, int timeoutSecond, int maxReadBufferBytes) { @@ -254,7 +243,7 @@ public abstract class AbstractThriftServiceThread extends Thread { poolArgs.selectorThreads(selectorThreads); executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool( - minWorkerThreads, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName); + 0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName); poolArgs.executorService(executorService); poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); @@ -265,7 +254,6 @@ public abstract class AbstractThriftServiceThread extends Thread { private THsHaServer.Args initAsyncedHshaPoolArgs( TBaseAsyncProcessor<?> processor, String threadsName, - int minWorkerThreads, int maxWorkerThreads, int timeoutSecond, int maxReadBufferBytes) { @@ -273,7 +261,7 @@ public abstract class AbstractThriftServiceThread extends Thread { poolArgs.maxReadBufferBytes = maxReadBufferBytes; executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool( - minWorkerThreads, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName); + 0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName); poolArgs.executorService(executorService); poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java index 77f25ff6ab0..6a3f5ef42e7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java @@ -35,7 +35,6 @@ public class ThriftServiceThread extends AbstractThriftServiceThread { String bindAddress, int port, int selectorThreads, - int minWorkerThreads, int maxWorkerThreads, int timeoutSecond, TServerEventHandler serverEventHandler, @@ -51,7 +50,6 @@ public class ThriftServiceThread extends AbstractThriftServiceThread { bindAddress, port, selectorThreads, - minWorkerThreads, maxWorkerThreads, timeoutSecond, serverEventHandler,
