This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new ec65faa7d6b Pipe: Remove parameters from iotdb-common.properties that
users will not modify (#10851) (#10852)
ec65faa7d6b is described below
commit ec65faa7d6bf4e3c9bfb5fdb5c398b557904f699
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 14 17:42:11 2023 +0800
Pipe: Remove parameters from iotdb-common.properties that users will not
modify (#10851) (#10852)
(cherry picked from commit 11af323b79d4e9bf7d5596290ebe863ad728865f)
---
.../resources/conf/iotdb-datanode.properties | 2 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 2 +-
.../resources/conf/iotdb-common.properties | 82 ++--------------------
.../iotdb/commons/client/ClientPoolFactory.java | 2 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 23 +++---
.../iotdb/commons/conf/CommonDescriptor.java | 8 +--
.../iotdb/commons/pipe/config/PipeConfig.java | 4 +-
7 files changed, 26 insertions(+), 97 deletions(-)
diff --git
a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
index 71904a4bb28..fb3f2ef06f9 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
@@ -192,7 +192,7 @@ dn_target_config_node_list=127.0.0.1:10710
# tracing dir
# Uncomment following fields to configure the tracing root directory.
-# For Window platform, the index is as follows:
+# For Windows platform, the index is as follows:
# dn_tracing_dir=datanode\\tracing
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index 65687fa2377..a025f1742ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -113,7 +113,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorTimeoutMs())
.setRpcThriftCompressionEnabled(
-
PIPE_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled())
+ PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
ip,
port));
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 3524973a789..cce12379ec6 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -875,7 +875,7 @@ cluster_name=defaultCluster
# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be
handled as a relative
# path.
#
-# For Window platform
+# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is
# absolute. Otherwise, it is relative.
# udf_lib_dir=ext\\udf
@@ -889,7 +889,7 @@ cluster_name=defaultCluster
####################
# Uncomment the following field to configure the trigger lib directory.
-# For Window platform
+# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is
# absolute. Otherwise, it is relative.
# trigger_lib_dir=ext\\trigger
@@ -937,7 +937,7 @@ cluster_name=defaultCluster
####################
# Uncomment the following field to configure the pipe lib directory.
-# For Window platform
+# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
@@ -945,65 +945,13 @@ cluster_name=defaultCluster
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
-# The name of the directory that stores the files temporarily hold or
generated by the pipe module.
-# The directory is located in the data directory of IoTDB.
-# pipe_hardlink_base_dir_name=pipe
-
-# The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe module.
-# The directory is located in the pipe_hardlink_base_dir_name directory of
IoTDB.
-# pipe_hardlink_tsfile_dir_name=tsfile
-
-# The name of the directory that stores the wal temporarily hold or generated
by the pipe module.
-# The directory is located in the pipe_hardlink_base_dir_name directory of
IoTDB.
-# pipe_hardlink_wal_dir_name=wal
-
-# Enable hardlink for wal files or not. If enabled, the wal files will be
hardlink-ed to
-# the pipe_hardlink_wal_dir_name directory.
-# pipe_hardlink_wal_enabled=false
-
-# The row size of tablets created in pipe transfer.
-# pipe_data_structure_tablet_row_size=16384
-
# The maximum number of threads that can be used to execute the pipe subtasks
in PipeSubtaskExecutor.
+# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1,
CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
-# The number of events that need to be consumed before a checkpoint is
triggered.
-#
pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000
-
-# The time duration (in milliseconds) between checkpoints.
-# pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000
-
-# The maximum blocking time (in milliseconds) for the pending queue.
-# pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000
-
-# The default size of ring buffer in the realtime extractor's disruptor queue.
-# pipe_extractor_assigner_disruptor_ring_buffer_size=65536
-
-# The maximum number of entries the deviceToExtractorsCache can hold.
-# pipe_extractor_matcher_cache_size=1024
-
-# The capacity for the number of tablet events that can be stored in the
pending queue of the hybrid realtime extractor.
-# pipe_extractor_pending_queue_capacity=16
-
-# The limit for the number of tablet events that can be held in the pending
queue of the hybrid realtime extractor.
-# Noted that: this should be less than or equals to
realtimeExtractorPendingQueueCapacity
-# pipe_extractor_pending_queue_tablet_limit=8
-
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
-# The buffer size used for reading file during file transfer.
-# pipe_connector_read_file_buffer_size=8388608
-
-# The delay period (in milliseconds) between each retry when a connection
failure occurs.
-# pipe_connector_retry_interval_ms=1000
-
-# The size of the pending queue for the PipeConnector to store the events.
-# pipe_connector_pending_queue_size=16
-
-# If the thrift RPC compression is enabled.
-# pipe_async_connector_rpc_thrift_compression_enabled=false
-
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
@@ -1013,28 +961,6 @@ cluster_name=defaultCluster
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
-# True if the pipe heartbeat is seperated from the cluster's heartbeat, false
if the pipe heartbeat is
-# merged with the cluster's heartbeat.
-# pipe_heartbeat_seperated_mode_enabled=true
-
-# The interval time between the heartbeat that collecting pipe meta (in
seconds).
-# pipe_heartbeat_interval_seconds_for_collecting_pipe_meta=100
-
-# The initial delay before starting the PipeMetaSyncer service.
-# pipe_meta_syncer_initial_sync_delay_minutes=3
-
-# The sync regular interval (in minutes) for the PipeMetaSyncer service.
-# pipe_meta_syncer_sync_interval_minutes=3
-
-# When the pipe meta synchronization round is a multiple of it, it will
restart pipes stopped automatically by critical exceptions.
-# Useless if the pipe_exception_stopped_auto_restart_enabled is set to false.
-# Eg: if pipe_meta_syncer_sync_interval_minutes=3, and
pipe_meta_syncer_auto_restart_pipe_round=5,
-# then the pipe restart process's interval will be 15 minutes, and is executed
by the pipe meta sync thread.
-# pipe_meta_syncer_auto_restart_pipe_check_interval_round=1
-
-# Whether to enable pipe auto restart after pipe meets error during runtime.
-# pipe_auto_restart_enabled=true
-
####################
### RatisConsensus Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 72683e3a29d..9297136365f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -267,7 +267,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs((int)
conf.getPipeConnectorTimeoutMs())
.setRpcThriftCompressionEnabled(
-
conf.isPipeAsyncConnectorRPCThriftCompressionEnabled())
+ conf.isPipeConnectorRPCThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
conf.getPipeAsyncConnectorSelectorNumber())
.build(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f4f5905070b..97932889caa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -150,7 +150,8 @@ public class CommonConfig {
private boolean pipeHardLinkWALEnabled = false;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
- private int pipeSubtaskExecutorMaxThreadNum = 5;
+ private int pipeSubtaskExecutorMaxThreadNum =
+ Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
@@ -160,14 +161,14 @@ public class CommonConfig {
private int pipeExtractorMatcherCacheSize = 1024;
private int pipeExtractorPendingQueueCapacity = 16;
private int pipeExtractorPendingQueueTabletLimit =
pipeExtractorPendingQueueCapacity / 2;
- private int pipeDataStructureTabletRowSize = 16384;
+ private int pipeDataStructureTabletRowSize = 2048;
private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 16;
+ private boolean pipeConnectorRPCThriftCompressionEnabled = false;
- private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false;
private int pipeAsyncConnectorSelectorNumber = 1;
private int pipeAsyncConnectorCoreClientNumber = 8;
private int pipeAsyncConnectorMaxClientNumber = 16;
@@ -559,14 +560,13 @@ public class CommonConfig {
this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
}
- public void setPipeAsyncConnectorRPCThriftCompressionEnabled(
- boolean pipeAsyncConnectorRPCThriftCompressionEnabled) {
- this.pipeAsyncConnectorRPCThriftCompressionEnabled =
- pipeAsyncConnectorRPCThriftCompressionEnabled;
+ public void setPipeConnectorRPCThriftCompressionEnabled(
+ boolean pipeConnectorRPCThriftCompressionEnabled) {
+ this.pipeConnectorRPCThriftCompressionEnabled =
pipeConnectorRPCThriftCompressionEnabled;
}
- public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
- return pipeAsyncConnectorRPCThriftCompressionEnabled;
+ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
+ return pipeConnectorRPCThriftCompressionEnabled;
}
public int getPipeAsyncConnectorSelectorNumber() {
@@ -686,7 +686,10 @@ public class CommonConfig {
}
public void setPipeSubtaskExecutorMaxThreadNum(int
pipeSubtaskExecutorMaxThreadNum) {
- this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
+ this.pipeSubtaskExecutorMaxThreadNum =
+ Math.min(
+ pipeSubtaskExecutorMaxThreadNum,
+ Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
}
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d29c0333a59..8ca5dbbd85e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -326,12 +326,12 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
-
- config.setPipeAsyncConnectorRPCThriftCompressionEnabled(
+ config.setPipeConnectorRPCThriftCompressionEnabled(
Boolean.parseBoolean(
properties.getProperty(
- "pipe_async_connector_rpc_thrift_compression_enable",
-
String.valueOf(config.isPipeAsyncConnectorRPCThriftCompressionEnabled()))));
+ "pipe_connector_rpc_thrift_compression_enabled",
+
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))));
+
config.setPipeAsyncConnectorSelectorNumber(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index db33599278b..355d0851207 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -107,8 +107,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
}
- public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
- return COMMON_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled();
+ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
+ return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
public int getPipeAsyncConnectorSelectorNumber() {