This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b3593232fbb Pipe: Default parameters adjustment for pipe threads and 
pipeStuckRestartMinIntervalMs (#14819) (#14838)
b3593232fbb is described below

commit b3593232fbb11b502469de823fba815540412b49
Author: nanxiang xia <[email protected]>
AuthorDate: Fri Feb 14 17:57:02 2025 +0800

    Pipe: Default parameters adjustment for pipe threads and 
pipeStuckRestartMinIntervalMs (#14819) (#14838)
---
 .../conf/iotdb-system.properties.template          | 10 +++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 15 +++++++-------
 .../iotdb/commons/conf/CommonDescriptor.java       | 24 +++++++++++++---------
 3 files changed, 27 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 50b619ec335..13f488b0e6f 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1715,10 +1715,10 @@ continuous_query_min_every_interval_in_ms=1000
 pipe_lib_dir=ext/pipe
 
 # The maximum number of threads that can be used to execute the pipe subtasks 
in PipeSubtaskExecutor.
-# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, 
CPU core number / 2)).
+# When <= 0, use max(5, CPU core number).
 # effectiveMode: restart
 # Datatype: int
-pipe_subtask_executor_max_thread_num=5
+pipe_subtask_executor_max_thread_num=0
 
 # The connection timeout (in milliseconds) for the thrift client.
 # effectiveMode: restart
@@ -1727,14 +1727,16 @@ pipe_sink_timeout_ms=900000
 
 # The maximum number of selectors that can be used in the sink.
 # Recommend to set this value to less than or equal to 
pipe_sink_max_client_number.
+# When <= 0, use max(4, CPU core number).
 # effectiveMode: restart
 # Datatype: int
-pipe_sink_selector_number=4
+pipe_sink_selector_number=0
 
 # The maximum number of clients that can be used in the sink.
+# When <= 0, use max(16, CPU core number).
 # effectiveMode: restart
 # Datatype: int
-pipe_sink_max_client_number=16
+pipe_sink_max_client_number=0
 
 # Whether to enable receiving pipe data through air gap.
 # The receiver can only return 0 or 1 in tcp mode to indicate whether the data 
is received successfully.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b2eeaf25f2c..566a37e82eb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,7 +207,7 @@ public class CommonConfig {
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
-      Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+      Math.max(5, Runtime.getRuntime().availableProcessors());
 
   private int pipeNonForwardingEventsProgressReportInterval = 100;
 
@@ -232,8 +232,10 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
-  private int pipeAsyncConnectorSelectorNumber = 4;
-  private int pipeAsyncConnectorMaxClientNumber = 16;
+  private int pipeAsyncConnectorSelectorNumber =
+      Math.max(4, Runtime.getRuntime().availableProcessors());
+  private int pipeAsyncConnectorMaxClientNumber =
+      Math.max(16, Runtime.getRuntime().availableProcessors());
 
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -259,7 +261,7 @@ public class CommonConfig {
   private long pipeMaxAllowedLinkedTsFileCount = 100;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
-  private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes
+  private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -940,10 +942,7 @@ public class CommonConfig {
   }
 
   public void setPipeSubtaskExecutorMaxThreadNum(int 
pipeSubtaskExecutorMaxThreadNum) {
-    this.pipeSubtaskExecutorMaxThreadNum =
-        Math.min(
-            pipeSubtaskExecutorMaxThreadNum,
-            Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+    this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
   }
 
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 529d1053c42..3661feefa6e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -306,13 +306,13 @@ public class CommonDescriptor {
                 "pipe_realtime_queue_poll_history_threshold",
                 
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
 
-    config.setPipeSubtaskExecutorMaxThreadNum(
+    int pipeSubtaskExecutorMaxThreadNum =
         Integer.parseInt(
             properties.getProperty(
                 "pipe_subtask_executor_max_thread_num",
-                
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))));
-    if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
-      config.setPipeSubtaskExecutorMaxThreadNum(5);
+                
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum())));
+    if (pipeSubtaskExecutorMaxThreadNum > 0) {
+      
config.setPipeSubtaskExecutorMaxThreadNum(pipeSubtaskExecutorMaxThreadNum);
     }
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
         Integer.parseInt(
@@ -405,22 +405,26 @@ public class CommonDescriptor {
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
                         
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
-
-    config.setPipeAsyncConnectorSelectorNumber(
+    int pipeAsyncConnectorSelectorNumber =
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
                 .orElse(
                     properties.getProperty(
                         "pipe_async_connector_selector_number",
-                        
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
-    config.setPipeAsyncConnectorMaxClientNumber(
+                        
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+    if (pipeAsyncConnectorSelectorNumber > 0) {
+      
config.setPipeAsyncConnectorSelectorNumber(pipeAsyncConnectorSelectorNumber);
+    }
+    int pipeAsyncConnectorMaxClientNumber =
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
                 .orElse(
                     properties.getProperty(
                         "pipe_async_connector_max_client_number",
-                        
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
-
+                        
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+    if (pipeAsyncConnectorMaxClientNumber > 0) {
+      
config.setPipeAsyncConnectorMaxClientNumber(pipeAsyncConnectorMaxClientNumber);
+    }
     config.setPipeAllSinksRateLimitBytesPerSecond(
         Double.parseDouble(
             properties.getProperty(

Reply via email to