This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b89318f59db  Remove dn_rpc_min_concurrent_client_num to fix minWorker 
> maxWorker bugs #15296
b89318f59db is described below

commit b89318f59db8a38d47a2f046867f6e4ae7075701
Author: Potato <[email protected]>
AuthorDate: Tue Apr 8 14:31:34 2025 +0800

     Remove dn_rpc_min_concurrent_client_num to fix minWorker > maxWorker bugs 
#15296
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../consensus/config/PipeConsensusConfig.java      | 28 ----------------------
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ---------
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 12 ----------
 .../db/consensus/DataRegionConsensusImpl.java      |  4 ----
 .../conf/iotdb-system.properties.template          |  5 ----
 .../service/AbstractThriftServiceThread.java       | 20 ++++------------
 .../iotdb/commons/service/ThriftServiceThread.java |  2 --
 7 files changed, 4 insertions(+), 78 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index eb6ea361d24..f0366cf0087 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -81,8 +81,6 @@ public class PipeConsensusConfig {
   }
 
   public static class RPC {
-    private final int rpcSelectorThreadNum;
-    private final int rpcMinConcurrentClientNum;
     private final int rpcMaxConcurrentClientNum;
     private final int thriftServerAwaitTimeForStopService;
     private final boolean isRpcThriftCompressionEnabled;
@@ -90,15 +88,11 @@ public class PipeConsensusConfig {
     private final int thriftMaxFrameSize;
 
     public RPC(
-        int rpcSelectorThreadNum,
-        int rpcMinConcurrentClientNum,
         int rpcMaxConcurrentClientNum,
         int thriftServerAwaitTimeForStopService,
         boolean isRpcThriftCompressionEnabled,
         int connectionTimeoutInMs,
         int thriftMaxFrameSize) {
-      this.rpcSelectorThreadNum = rpcSelectorThreadNum;
-      this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
       this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
       this.thriftServerAwaitTimeForStopService = 
thriftServerAwaitTimeForStopService;
       this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
@@ -106,14 +100,6 @@ public class PipeConsensusConfig {
       this.thriftMaxFrameSize = thriftMaxFrameSize;
     }
 
-    public int getRpcSelectorThreadNum() {
-      return rpcSelectorThreadNum;
-    }
-
-    public int getRpcMinConcurrentClientNum() {
-      return rpcMinConcurrentClientNum;
-    }
-
     public int getRpcMaxConcurrentClientNum() {
       return rpcMaxConcurrentClientNum;
     }
@@ -139,24 +125,12 @@ public class PipeConsensusConfig {
     }
 
     public static class Builder {
-      private int rpcSelectorThreadNum = 1;
-      private int rpcMinConcurrentClientNum = 
Runtime.getRuntime().availableProcessors();
       private int rpcMaxConcurrentClientNum = 65535;
       private int thriftServerAwaitTimeForStopService = 60;
       private boolean isRpcThriftCompressionEnabled = false;
       private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60);
       private int thriftMaxFrameSize = 536870912;
 
-      public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
-        this.rpcSelectorThreadNum = rpcSelectorThreadNum;
-        return this;
-      }
-
-      public RPC.Builder setRpcMinConcurrentClientNum(int 
rpcMinConcurrentClientNum) {
-        this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
-        return this;
-      }
-
       public RPC.Builder setRpcMaxConcurrentClientNum(int 
rpcMaxConcurrentClientNum) {
         this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
         return this;
@@ -185,8 +159,6 @@ public class PipeConsensusConfig {
 
       public RPC build() {
         return new RPC(
-            rpcSelectorThreadNum,
-            rpcMinConcurrentClientNum,
             rpcMaxConcurrentClientNum,
             thriftServerAwaitTimeForStopService,
             isRpcThriftCompressionEnabled,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3e27d24b783..dfceb0c6021 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -145,9 +145,6 @@ public class IoTDBConfig {
   /** Rpc Selector thread num */
   private int rpcSelectorThreadCount = 1;
 
-  /** Min concurrent client number */
-  private int rpcMinConcurrentClientNum = 
Runtime.getRuntime().availableProcessors();
-
   /** Max concurrent client number */
   private int rpcMaxConcurrentClientNum = 1000;
 
@@ -1770,14 +1767,6 @@ public class IoTDBConfig {
     this.rpcSelectorThreadCount = rpcSelectorThreadCount;
   }
 
-  public int getRpcMinConcurrentClientNum() {
-    return rpcMinConcurrentClientNum;
-  }
-
-  public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
-    this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
-  }
-
   public int getRpcMaxConcurrentClientNum() {
     return rpcMaxConcurrentClientNum;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c2d6f90e5ec..b4b2dbb50c8 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -728,18 +728,6 @@ public class IoTDBDescriptor {
 
     conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
 
-    int minConcurrentClientNum =
-        Integer.parseInt(
-            properties.getProperty(
-                "dn_rpc_min_concurrent_client_num",
-                Integer.toString(conf.getRpcMinConcurrentClientNum())));
-
-    if (minConcurrentClientNum <= 0) {
-      minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
-    }
-
-    conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
-
     int maxConcurrentClientNum =
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 38ce6a0f158..0f71a1cece8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -129,8 +129,6 @@ public class DataRegionConsensusImpl {
                   .setRpc(
                       RPC.newBuilder()
                           
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-                          
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-                          
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
                           
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
                           
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
                           
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
@@ -157,8 +155,6 @@ public class DataRegionConsensusImpl {
                       PipeConsensusConfig.RPC
                           .newBuilder()
                           
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-                          
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-                          
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
                           
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
                           
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
                           .setThriftServerAwaitTimeForStopService(
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 2fa2c51e3d4..8d40a28d8af 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -502,11 +502,6 @@ dn_rpc_advanced_compression_enable=false
 # Datatype: int
 dn_rpc_selector_thread_count=1
 
-# The min number of concurrent clients that can be connected to the dataNode.
-# effectiveMode: restart
-# Datatype: int
-dn_rpc_min_concurrent_client_num=1
-
 # The maximum number of concurrent clients that can be connected to the 
dataNode.
 # effectiveMode: restart
 # Datatype: int
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 6d0a871c9df..2afc66cfe7d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -113,7 +113,6 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       String bindAddress,
       int port,
       int selectorThreads,
-      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
@@ -134,7 +133,6 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
                   processor,
                   threadsName,
                   selectorThreads,
-                  minWorkerThreads,
                   maxWorkerThreads,
                   timeoutSecond,
                   maxReadBufferBytes);
@@ -143,12 +141,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
         case HSHA:
           THsHaServer.Args poolArgs1 =
               initAsyncedHshaPoolArgs(
-                  processor,
-                  threadsName,
-                  minWorkerThreads,
-                  maxWorkerThreads,
-                  timeoutSecond,
-                  maxReadBufferBytes);
+                  processor, threadsName, maxWorkerThreads, timeoutSecond, 
maxReadBufferBytes);
           poolServer = new THsHaServer(poolArgs1);
           break;
         default:
@@ -228,10 +221,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
   private TThreadPoolServer.Args initSyncedPoolArgs(
       TProcessor processor, String threadsName, int maxWorkerThreads, int 
timeoutSecond) {
     TThreadPoolServer.Args poolArgs = new 
TThreadPoolServer.Args(serverTransport);
-    poolArgs
-        .maxWorkerThreads(maxWorkerThreads)
-        .minWorkerThreads(Runtime.getRuntime().availableProcessors())
-        .stopTimeoutVal(timeoutSecond);
+    
poolArgs.maxWorkerThreads(maxWorkerThreads).minWorkerThreads(0).stopTimeoutVal(timeoutSecond);
     executorService = 
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
     poolArgs.executorService = executorService;
     poolArgs.processor(processor);
@@ -244,7 +234,6 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       TBaseAsyncProcessor<?> processor,
       String threadsName,
       int selectorThreads,
-      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       int maxReadBufferBytes) {
@@ -254,7 +243,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
     poolArgs.selectorThreads(selectorThreads);
     executorService =
         IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
-            minWorkerThreads, maxWorkerThreads, timeoutSecond, 
TimeUnit.SECONDS, threadsName);
+            0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
     poolArgs.executorService(executorService);
     poolArgs.processor(processor);
     poolArgs.protocolFactory(protocolFactory);
@@ -265,7 +254,6 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
   private THsHaServer.Args initAsyncedHshaPoolArgs(
       TBaseAsyncProcessor<?> processor,
       String threadsName,
-      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       int maxReadBufferBytes) {
@@ -273,7 +261,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
     poolArgs.maxReadBufferBytes = maxReadBufferBytes;
     executorService =
         IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
-            minWorkerThreads, maxWorkerThreads, timeoutSecond, 
TimeUnit.SECONDS, threadsName);
+            0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
     poolArgs.executorService(executorService);
     poolArgs.processor(processor);
     poolArgs.protocolFactory(protocolFactory);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index 77f25ff6ab0..6a3f5ef42e7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -35,7 +35,6 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
       String bindAddress,
       int port,
       int selectorThreads,
-      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
@@ -51,7 +50,6 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         bindAddress,
         port,
         selectorThreads,
-        minWorkerThreads,
         maxWorkerThreads,
         timeoutSecond,
         serverEventHandler,

Reply via email to