This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b89318f59db Remove dn_rpc_min_concurrent_client_num to fix minWorker
> maxWorker bugs #15296
b89318f59db is described below
commit b89318f59db8a38d47a2f046867f6e4ae7075701
Author: Potato <[email protected]>
AuthorDate: Tue Apr 8 14:31:34 2025 +0800
Remove dn_rpc_min_concurrent_client_num to fix minWorker > maxWorker bugs
#15296
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../consensus/config/PipeConsensusConfig.java | 28 ----------------------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ---------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 ----------
.../db/consensus/DataRegionConsensusImpl.java | 4 ----
.../conf/iotdb-system.properties.template | 5 ----
.../service/AbstractThriftServiceThread.java | 20 ++++------------
.../iotdb/commons/service/ThriftServiceThread.java | 2 --
7 files changed, 4 insertions(+), 78 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index eb6ea361d24..f0366cf0087 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -81,8 +81,6 @@ public class PipeConsensusConfig {
}
public static class RPC {
- private final int rpcSelectorThreadNum;
- private final int rpcMinConcurrentClientNum;
private final int rpcMaxConcurrentClientNum;
private final int thriftServerAwaitTimeForStopService;
private final boolean isRpcThriftCompressionEnabled;
@@ -90,15 +88,11 @@ public class PipeConsensusConfig {
private final int thriftMaxFrameSize;
public RPC(
- int rpcSelectorThreadNum,
- int rpcMinConcurrentClientNum,
int rpcMaxConcurrentClientNum,
int thriftServerAwaitTimeForStopService,
boolean isRpcThriftCompressionEnabled,
int connectionTimeoutInMs,
int thriftMaxFrameSize) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
@@ -106,14 +100,6 @@ public class PipeConsensusConfig {
this.thriftMaxFrameSize = thriftMaxFrameSize;
}
- public int getRpcSelectorThreadNum() {
- return rpcSelectorThreadNum;
- }
-
- public int getRpcMinConcurrentClientNum() {
- return rpcMinConcurrentClientNum;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
@@ -139,24 +125,12 @@ public class PipeConsensusConfig {
}
public static class Builder {
- private int rpcSelectorThreadNum = 1;
- private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
private int rpcMaxConcurrentClientNum = 65535;
private int thriftServerAwaitTimeForStopService = 60;
private boolean isRpcThriftCompressionEnabled = false;
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60);
private int thriftMaxFrameSize = 536870912;
- public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- return this;
- }
-
- public RPC.Builder setRpcMinConcurrentClientNum(int
rpcMinConcurrentClientNum) {
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
- return this;
- }
-
public RPC.Builder setRpcMaxConcurrentClientNum(int
rpcMaxConcurrentClientNum) {
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
return this;
@@ -185,8 +159,6 @@ public class PipeConsensusConfig {
public RPC build() {
return new RPC(
- rpcSelectorThreadNum,
- rpcMinConcurrentClientNum,
rpcMaxConcurrentClientNum,
thriftServerAwaitTimeForStopService,
isRpcThriftCompressionEnabled,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3e27d24b783..dfceb0c6021 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -145,9 +145,6 @@ public class IoTDBConfig {
/** Rpc Selector thread num */
private int rpcSelectorThreadCount = 1;
- /** Min concurrent client number */
- private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
-
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 1000;
@@ -1770,14 +1767,6 @@ public class IoTDBConfig {
this.rpcSelectorThreadCount = rpcSelectorThreadCount;
}
- public int getRpcMinConcurrentClientNum() {
- return rpcMinConcurrentClientNum;
- }
-
- public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c2d6f90e5ec..b4b2dbb50c8 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -728,18 +728,6 @@ public class IoTDBDescriptor {
conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
- int minConcurrentClientNum =
- Integer.parseInt(
- properties.getProperty(
- "dn_rpc_min_concurrent_client_num",
- Integer.toString(conf.getRpcMinConcurrentClientNum())));
-
- if (minConcurrentClientNum <= 0) {
- minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
- }
-
- conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
-
int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 38ce6a0f158..0f71a1cece8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -129,8 +129,6 @@ public class DataRegionConsensusImpl {
.setRpc(
RPC.newBuilder()
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
@@ -157,8 +155,6 @@ public class DataRegionConsensusImpl {
PipeConsensusConfig.RPC
.newBuilder()
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
.setThriftServerAwaitTimeForStopService(
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 2fa2c51e3d4..8d40a28d8af 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -502,11 +502,6 @@ dn_rpc_advanced_compression_enable=false
# Datatype: int
dn_rpc_selector_thread_count=1
-# The min number of concurrent clients that can be connected to the dataNode.
-# effectiveMode: restart
-# Datatype: int
-dn_rpc_min_concurrent_client_num=1
-
# The maximum number of concurrent clients that can be connected to the
dataNode.
# effectiveMode: restart
# Datatype: int
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 6d0a871c9df..2afc66cfe7d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -113,7 +113,6 @@ public abstract class AbstractThriftServiceThread extends
Thread {
String bindAddress,
int port,
int selectorThreads,
- int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
@@ -134,7 +133,6 @@ public abstract class AbstractThriftServiceThread extends
Thread {
processor,
threadsName,
selectorThreads,
- minWorkerThreads,
maxWorkerThreads,
timeoutSecond,
maxReadBufferBytes);
@@ -143,12 +141,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
case HSHA:
THsHaServer.Args poolArgs1 =
initAsyncedHshaPoolArgs(
- processor,
- threadsName,
- minWorkerThreads,
- maxWorkerThreads,
- timeoutSecond,
- maxReadBufferBytes);
+ processor, threadsName, maxWorkerThreads, timeoutSecond,
maxReadBufferBytes);
poolServer = new THsHaServer(poolArgs1);
break;
default:
@@ -228,10 +221,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
private TThreadPoolServer.Args initSyncedPoolArgs(
TProcessor processor, String threadsName, int maxWorkerThreads, int
timeoutSecond) {
TThreadPoolServer.Args poolArgs = new
TThreadPoolServer.Args(serverTransport);
- poolArgs
- .maxWorkerThreads(maxWorkerThreads)
- .minWorkerThreads(Runtime.getRuntime().availableProcessors())
- .stopTimeoutVal(timeoutSecond);
+
poolArgs.maxWorkerThreads(maxWorkerThreads).minWorkerThreads(0).stopTimeoutVal(timeoutSecond);
executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
poolArgs.executorService = executorService;
poolArgs.processor(processor);
@@ -244,7 +234,6 @@ public abstract class AbstractThriftServiceThread extends
Thread {
TBaseAsyncProcessor<?> processor,
String threadsName,
int selectorThreads,
- int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
int maxReadBufferBytes) {
@@ -254,7 +243,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
poolArgs.selectorThreads(selectorThreads);
executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
- minWorkerThreads, maxWorkerThreads, timeoutSecond,
TimeUnit.SECONDS, threadsName);
+ 0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
poolArgs.executorService(executorService);
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
@@ -265,7 +254,6 @@ public abstract class AbstractThriftServiceThread extends
Thread {
private THsHaServer.Args initAsyncedHshaPoolArgs(
TBaseAsyncProcessor<?> processor,
String threadsName,
- int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
int maxReadBufferBytes) {
@@ -273,7 +261,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
poolArgs.maxReadBufferBytes = maxReadBufferBytes;
executorService =
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
- minWorkerThreads, maxWorkerThreads, timeoutSecond,
TimeUnit.SECONDS, threadsName);
+ 0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
poolArgs.executorService(executorService);
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index 77f25ff6ab0..6a3f5ef42e7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -35,7 +35,6 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
String bindAddress,
int port,
int selectorThreads,
- int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
@@ -51,7 +50,6 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
bindAddress,
port,
selectorThreads,
- minWorkerThreads,
maxWorkerThreads,
timeoutSecond,
serverEventHandler,