This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-reduce-parameters-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 548d1a8ff697f99deba1d5bd1f50b3ab958e093e Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Aug 14 17:28:45 2023 +0800 Pipe: Remove parameters from iotdb-common.properties that users will not modify (#10851) (cherry picked from commit 11af323b79d4e9bf7d5596290ebe863ad728865f) --- .../resources/conf/iotdb-datanode.properties | 2 +- .../thrift/sync/IoTDBThriftSyncConnector.java | 2 +- .../resources/conf/iotdb-common.properties | 82 ++-------------------- .../iotdb/commons/client/ClientPoolFactory.java | 2 +- .../apache/iotdb/commons/conf/CommonConfig.java | 23 +++--- .../iotdb/commons/conf/CommonDescriptor.java | 8 +-- .../iotdb/commons/pipe/config/PipeConfig.java | 4 +- 7 files changed, 26 insertions(+), 97 deletions(-) diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index 71904a4bb28..fb3f2ef06f9 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -192,7 +192,7 @@ dn_target_config_node_list=127.0.0.1:10710 # tracing dir # Uncomment following fields to configure the tracing root directory. -# For Window platform, the index is as follows: +# For Windows platform, the index is as follows: # dn_tracing_dir=datanode\\tracing # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index 65687fa2377..a025f1742ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -113,7 +113,7 @@ public class IoTDBThriftSyncConnector extends IoTDBThriftConnector { new ThriftClientProperty.Builder() .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorTimeoutMs()) .setRpcThriftCompressionEnabled( - PIPE_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled()) + PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()) .build(), ip, port)); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 3524973a789..cce12379ec6 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -875,7 +875,7 @@ cluster_name=defaultCluster # Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative # path. # -# For Window platform +# For Windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is # absolute. Otherwise, it is relative. # udf_lib_dir=ext\\udf @@ -889,7 +889,7 @@ cluster_name=defaultCluster #################### # Uncomment the following field to configure the trigger lib directory. -# For Window platform +# For Windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is # absolute. Otherwise, it is relative. # trigger_lib_dir=ext\\trigger @@ -937,7 +937,7 @@ cluster_name=defaultCluster #################### # Uncomment the following field to configure the pipe lib directory. -# For Window platform +# For Windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is # absolute. Otherwise, it is relative. # pipe_lib_dir=ext\\pipe @@ -945,65 +945,13 @@ cluster_name=defaultCluster # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # pipe_lib_dir=ext/pipe -# The name of the directory that stores the files temporarily hold or generated by the pipe module. -# The directory is located in the data directory of IoTDB. -# pipe_hardlink_base_dir_name=pipe - -# The name of the directory that stores the tsfiles temporarily hold or generated by the pipe module. -# The directory is located in the pipe_hardlink_base_dir_name directory of IoTDB. -# pipe_hardlink_tsfile_dir_name=tsfile - -# The name of the directory that stores the wal temporarily hold or generated by the pipe module. -# The directory is located in the pipe_hardlink_base_dir_name directory of IoTDB. -# pipe_hardlink_wal_dir_name=wal - -# Enable hardlink for wal files or not. If enabled, the wal files will be hardlink-ed to -# the pipe_hardlink_wal_dir_name directory. -# pipe_hardlink_wal_enabled=false - -# The row size of tablets created in pipe transfer. -# pipe_data_structure_tablet_row_size=16384 - # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. +# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). # pipe_subtask_executor_max_thread_num=5 -# The number of events that need to be consumed before a checkpoint is triggered. -# pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000 - -# The time duration (in milliseconds) between checkpoints. -# pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000 - -# The maximum blocking time (in milliseconds) for the pending queue. -# pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000 - -# The default size of ring buffer in the realtime extractor's disruptor queue. -# pipe_extractor_assigner_disruptor_ring_buffer_size=65536 - -# The maximum number of entries the deviceToExtractorsCache can hold. -# pipe_extractor_matcher_cache_size=1024 - -# The capacity for the number of tablet events that can be stored in the pending queue of the hybrid realtime extractor. -# pipe_extractor_pending_queue_capacity=16 - -# The limit for the number of tablet events that can be held in the pending queue of the hybrid realtime extractor. -# Noted that: this should be less than or equals to realtimeExtractorPendingQueueCapacity -# pipe_extractor_pending_queue_tablet_limit=8 - # The connection timeout (in milliseconds) for the thrift client. # pipe_connector_timeout_ms=900000 -# The buffer size used for reading file during file transfer. -# pipe_connector_read_file_buffer_size=8388608 - -# The delay period (in milliseconds) between each retry when a connection failure occurs. -# pipe_connector_retry_interval_ms=1000 - -# The size of the pending queue for the PipeConnector to store the events. -# pipe_connector_pending_queue_size=16 - -# If the thrift RPC compression is enabled. -# pipe_async_connector_rpc_thrift_compression_enabled=false - # The maximum number of selectors that can be used in the async connector. # pipe_async_connector_selector_number=1 @@ -1013,28 +961,6 @@ cluster_name=defaultCluster # The maximum number of clients that can be used in the async connector. # pipe_async_connector_max_client_number=16 -# True if the pipe heartbeat is seperated from the cluster's heartbeat, false if the pipe heartbeat is -# merged with the cluster's heartbeat. -# pipe_heartbeat_seperated_mode_enabled=true - -# The interval time between the heartbeat that collecting pipe meta (in seconds). -# pipe_heartbeat_interval_seconds_for_collecting_pipe_meta=100 - -# The initial delay before starting the PipeMetaSyncer service. -# pipe_meta_syncer_initial_sync_delay_minutes=3 - -# The sync regular interval (in minutes) for the PipeMetaSyncer service. -# pipe_meta_syncer_sync_interval_minutes=3 - -# When the pipe meta synchronization round is a multiple of it, it will restart pipes stopped automatically by critical exceptions. -# Useless if the pipe_exception_stopped_auto_restart_enabled is set to false. -# Eg: if pipe_meta_syncer_sync_interval_minutes=3, and pipe_meta_syncer_auto_restart_pipe_round=5, -# then the pipe restart process's interval will be 15 minutes, and is executed by the pipe meta sync thread. -# pipe_meta_syncer_auto_restart_pipe_check_interval_round=1 - -# Whether to enable pipe auto restart after pipe meets error during runtime. -# pipe_auto_restart_enabled=true - #################### ### RatisConsensus Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 72683e3a29d..9297136365f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -267,7 +267,7 @@ public class ClientPoolFactory { new ThriftClientProperty.Builder() .setConnectionTimeoutMs((int) conf.getPipeConnectorTimeoutMs()) .setRpcThriftCompressionEnabled( - conf.isPipeAsyncConnectorRPCThriftCompressionEnabled()) + conf.isPipeConnectorRPCThriftCompressionEnabled()) .setSelectorNumOfAsyncClientManager( conf.getPipeAsyncConnectorSelectorNumber()) .build(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index f4f5905070b..97932889caa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -150,7 +150,8 @@ public class CommonConfig { private boolean pipeHardLinkWALEnabled = false; /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ - private int pipeSubtaskExecutorMaxThreadNum = 5; + private int pipeSubtaskExecutorMaxThreadNum = + Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000; private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; @@ -160,14 +161,14 @@ public class CommonConfig { private int pipeExtractorMatcherCacheSize = 1024; private int pipeExtractorPendingQueueCapacity = 16; private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2; - private int pipeDataStructureTabletRowSize = 16384; + private int pipeDataStructureTabletRowSize = 2048; private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes private int pipeConnectorReadFileBufferSize = 8388608; private long pipeConnectorRetryIntervalMs = 1000L; private int pipeConnectorPendingQueueSize = 16; + private boolean pipeConnectorRPCThriftCompressionEnabled = false; - private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false; private int pipeAsyncConnectorSelectorNumber = 1; private int pipeAsyncConnectorCoreClientNumber = 8; private int pipeAsyncConnectorMaxClientNumber = 16; @@ -559,14 +560,13 @@ public class CommonConfig { this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize; } - public void setPipeAsyncConnectorRPCThriftCompressionEnabled( - boolean pipeAsyncConnectorRPCThriftCompressionEnabled) { - this.pipeAsyncConnectorRPCThriftCompressionEnabled = - pipeAsyncConnectorRPCThriftCompressionEnabled; + public void setPipeConnectorRPCThriftCompressionEnabled( + boolean pipeConnectorRPCThriftCompressionEnabled) { + this.pipeConnectorRPCThriftCompressionEnabled = pipeConnectorRPCThriftCompressionEnabled; } - public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() { - return pipeAsyncConnectorRPCThriftCompressionEnabled; + public boolean isPipeConnectorRPCThriftCompressionEnabled() { + return pipeConnectorRPCThriftCompressionEnabled; } public int getPipeAsyncConnectorSelectorNumber() { @@ -686,7 +686,10 @@ public class CommonConfig { } public void setPipeSubtaskExecutorMaxThreadNum(int pipeSubtaskExecutorMaxThreadNum) { - this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum; + this.pipeSubtaskExecutorMaxThreadNum = + Math.min( + pipeSubtaskExecutorMaxThreadNum, + Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); } public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index d29c0333a59..8ca5dbbd85e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -326,12 +326,12 @@ public class CommonDescriptor { properties.getProperty( "pipe_connector_pending_queue_size", String.valueOf(config.getPipeConnectorPendingQueueSize())))); - - config.setPipeAsyncConnectorRPCThriftCompressionEnabled( + config.setPipeConnectorRPCThriftCompressionEnabled( Boolean.parseBoolean( properties.getProperty( - "pipe_async_connector_rpc_thrift_compression_enable", - String.valueOf(config.isPipeAsyncConnectorRPCThriftCompressionEnabled())))); + "pipe_connector_rpc_thrift_compression_enabled", + String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))); + config.setPipeAsyncConnectorSelectorNumber( Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index db33599278b..355d0851207 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -107,8 +107,8 @@ public class PipeConfig { return COMMON_CONFIG.getPipeConnectorPendingQueueSize(); } - public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() { - return COMMON_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled(); + public boolean isPipeConnectorRPCThriftCompressionEnabled() { + return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled(); } public int getPipeAsyncConnectorSelectorNumber() {
