This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b3593232fbb Pipe: Default parameters adjustment for pipe threads and
pipeStuckRestartMinIntervalMs (#14819) (#14838)
b3593232fbb is described below
commit b3593232fbb11b502469de823fba815540412b49
Author: nanxiang xia <[email protected]>
AuthorDate: Fri Feb 14 17:57:02 2025 +0800
Pipe: Default parameters adjustment for pipe threads and
pipeStuckRestartMinIntervalMs (#14819) (#14838)
---
.../conf/iotdb-system.properties.template | 10 +++++----
.../apache/iotdb/commons/conf/CommonConfig.java | 15 +++++++-------
.../iotdb/commons/conf/CommonDescriptor.java | 24 +++++++++++++---------
3 files changed, 27 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 50b619ec335..13f488b0e6f 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1715,10 +1715,10 @@ continuous_query_min_every_interval_in_ms=1000
pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks
in PipeSubtaskExecutor.
-# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1,
CPU core number / 2)).
+# When <= 0, use max(5, CPU core number).
# effectiveMode: restart
# Datatype: int
-pipe_subtask_executor_max_thread_num=5
+pipe_subtask_executor_max_thread_num=0
# The connection timeout (in milliseconds) for the thrift client.
# effectiveMode: restart
@@ -1727,14 +1727,16 @@ pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to
pipe_sink_max_client_number.
+# When <= 0, use max(4, CPU core number).
# effectiveMode: restart
# Datatype: int
-pipe_sink_selector_number=4
+pipe_sink_selector_number=0
# The maximum number of clients that can be used in the sink.
+# When <= 0, use max(16, CPU core number).
# effectiveMode: restart
# Datatype: int
-pipe_sink_max_client_number=16
+pipe_sink_max_client_number=0
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data
is received successfully.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b2eeaf25f2c..566a37e82eb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,7 +207,7 @@ public class CommonConfig {
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
- Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ Math.max(5, Runtime.getRuntime().availableProcessors());
private int pipeNonForwardingEventsProgressReportInterval = 100;
@@ -232,8 +232,10 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
- private int pipeAsyncConnectorSelectorNumber = 4;
- private int pipeAsyncConnectorMaxClientNumber = 16;
+ private int pipeAsyncConnectorSelectorNumber =
+ Math.max(4, Runtime.getRuntime().availableProcessors());
+ private int pipeAsyncConnectorMaxClientNumber =
+ Math.max(16, Runtime.getRuntime().availableProcessors());
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -259,7 +261,7 @@ public class CommonConfig {
private long pipeMaxAllowedLinkedTsFileCount = 100;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
- private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes
+ private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -940,10 +942,7 @@ public class CommonConfig {
}
public void setPipeSubtaskExecutorMaxThreadNum(int
pipeSubtaskExecutorMaxThreadNum) {
- this.pipeSubtaskExecutorMaxThreadNum =
- Math.min(
- pipeSubtaskExecutorMaxThreadNum,
- Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
}
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 529d1053c42..3661feefa6e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -306,13 +306,13 @@ public class CommonDescriptor {
"pipe_realtime_queue_poll_history_threshold",
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
- config.setPipeSubtaskExecutorMaxThreadNum(
+ int pipeSubtaskExecutorMaxThreadNum =
Integer.parseInt(
properties.getProperty(
"pipe_subtask_executor_max_thread_num",
-
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))));
- if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
- config.setPipeSubtaskExecutorMaxThreadNum(5);
+
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum())));
+ if (pipeSubtaskExecutorMaxThreadNum > 0) {
+
config.setPipeSubtaskExecutorMaxThreadNum(pipeSubtaskExecutorMaxThreadNum);
}
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
@@ -405,22 +405,26 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
-
- config.setPipeAsyncConnectorSelectorNumber(
+ int pipeAsyncConnectorSelectorNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_selector_number",
-
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
- config.setPipeAsyncConnectorMaxClientNumber(
+
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+ if (pipeAsyncConnectorSelectorNumber > 0) {
+
config.setPipeAsyncConnectorSelectorNumber(pipeAsyncConnectorSelectorNumber);
+ }
+ int pipeAsyncConnectorMaxClientNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_max_client_number",
-
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
-
+
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+ if (pipeAsyncConnectorMaxClientNumber > 0) {
+
config.setPipeAsyncConnectorMaxClientNumber(pipeAsyncConnectorMaxClientNumber);
+ }
config.setPipeAllSinksRateLimitBytesPerSecond(
Double.parseDouble(
properties.getProperty(