This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5994e1ee93f Support hot reload for cluster runtime configs (#17975)
5994e1ee93f is described below
commit 5994e1ee93f472d7f32320f7dd900d9e7ad33e87
Author: Yongzao <[email protected]>
AuthorDate: Sun Jun 21 16:48:24 2026 +0800
Support hot reload for cluster runtime configs (#17975)
---
.../it/env/cluster/config/MppCommonConfig.java | 4 +-
.../env/cluster/config/MppSharedCommonConfig.java | 4 +-
.../it/env/remote/config/RemoteCommonConfig.java | 5 +-
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +-
.../partition/IoTDBPartitionInheritStrategyIT.java | 2 +-
.../partition/IoTDBPartitionShuffleStrategyIT.java | 2 +-
.../db/it/IoTDBSetConfigurationClusterIT.java | 122 +++++++++
.../iotdb/db/it/IoTDBSetConfigurationIT.java | 274 +++++++++++++++++++++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 29 ++-
.../confignode/conf/ConfigNodeDescriptor.java | 176 +++++++++----
.../iotdb/confignode/manager/ConfigManager.java | 66 ++++-
.../confignode/manager/RetryFailedTasksThread.java | 23 +-
.../iotdb/confignode/manager/load/LoadManager.java | 6 +
.../manager/load/cache/AbstractLoadCache.java | 2 +-
.../manager/load/service/EventService.java | 23 +-
.../manager/load/service/HeartbeatService.java | 23 +-
.../manager/load/service/StatisticsService.java | 23 +-
.../manager/load/service/TopologyService.java | 2 +-
.../iotdb/confignode/manager/node/NodeManager.java | 4 +-
.../manager/partition/PartitionManager.java | 8 +-
.../manager/schema/ClusterSchemaManager.java | 10 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 36 ++-
.../plan/AbstractFragmentParallelPlanner.java | 4 +-
.../conf/iotdb-system.properties.template | 18 +-
.../iotdb/commons/conf/ConfigurationFileUtils.java | 16 +-
26 files changed, 770 insertions(+), 123 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 064cfe1e2b2..d2743e89a8a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -466,13 +466,13 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
}
@Override
- public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ public CommonConfig setDataRegionPerDataNode(int dataRegionPerDataNode) {
setProperty("data_region_per_data_node",
String.valueOf(dataRegionPerDataNode));
return this;
}
@Override
- public CommonConfig setSchemaRegionPerDataNode(double
schemaRegionPerDataNode) {
+ public CommonConfig setSchemaRegionPerDataNode(int schemaRegionPerDataNode) {
setProperty("schema_region_per_data_node",
String.valueOf(schemaRegionPerDataNode));
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 777d978f26d..708e7466ab1 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -479,14 +479,14 @@ public class MppSharedCommonConfig implements
CommonConfig {
}
@Override
- public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ public CommonConfig setDataRegionPerDataNode(int dataRegionPerDataNode) {
dnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
cnConfig.setDataRegionPerDataNode(dataRegionPerDataNode);
return this;
}
@Override
- public CommonConfig setSchemaRegionPerDataNode(double
schemaRegionPerDataNode) {
+ public CommonConfig setSchemaRegionPerDataNode(int schemaRegionPerDataNode) {
dnConfig.setSchemaRegionPerDataNode(schemaRegionPerDataNode);
cnConfig.setSchemaRegionPerDataNode(schemaRegionPerDataNode);
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 67a74dfe02e..7d9499773f0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -339,11 +339,12 @@ public class RemoteCommonConfig implements CommonConfig {
}
@Override
- public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) {
+ public CommonConfig setDataRegionPerDataNode(int dataRegionPerDataNode) {
return this;
}
- public CommonConfig setSchemaRegionPerDataNode(double
schemaRegionPerDataNode) {
+ @Override
+ public CommonConfig setSchemaRegionPerDataNode(int schemaRegionPerDataNode) {
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 09b767dd545..9e446e97d34 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -149,9 +149,9 @@ public interface CommonConfig {
CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte);
- CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
+ CommonConfig setDataRegionPerDataNode(int dataRegionPerDataNode);
- CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
+ CommonConfig setSchemaRegionPerDataNode(int schemaRegionPerDataNode);
CommonConfig setPipeMemoryManagementEnabled(boolean
pipeMemoryManagementEnabled);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
index 91d34b2d0c9..2f651753edb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java
@@ -56,7 +56,7 @@ public class IoTDBPartitionInheritStrategyIT {
private static final int testReplicationFactor = 1;
private static final int testSeriesSlotNum = 1000;
private static final long testTimePartitionInterval = 604800000;
- private static final double testDataRegionPerDataNode = 5.0;
+ private static final int testDataRegionPerDataNode = 5;
private static final String database = "root.database";
private static final int seriesPartitionSlotBatchSize = 100;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
index 1294c911d6b..903eded7379 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java
@@ -56,7 +56,7 @@ public class IoTDBPartitionShuffleStrategyIT {
public static final String SHUFFLE = "SHUFFLE";
private static final int testSeriesSlotNum = 1000;
private static final long testTimePartitionInterval = 604800000;
- private static final double testDataRegionPerDataNode = 5.0;
+ private static final int testDataRegionPerDataNode = 5;
private static final String database = "root.database";
private static final int testTimePartitionSlotsNum = 100;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationClusterIT.java
new file mode 100644
index 00000000000..8f55eec5a12
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationClusterIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBSetConfigurationClusterIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ EnvFactory.getEnv().initClusterEnvironment(2, 1);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testHotReloadConsistentConfigOnAllConfigNodes() throws Exception
{
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ assertSetConsistentClusterConfigurationOnSpecificNodeFailed(statement);
+
+ statement.execute("set configuration
\"read_consistency_level\"=\"weak\"");
+ for (int configNodeId : getConfigNodeIds()) {
+ assertAppliedConfiguration(configNodeId, "read_consistency_level",
"weak");
+ }
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"read_consistency_level\"=\"strong\"");
+ }
+ }
+ }
+
+ private static List<Integer> getConfigNodeIds() throws Exception {
+ List<Integer> configNodeIds = new ArrayList<>();
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ for (TConfigNodeLocation configNodeLocation :
client.showCluster().getConfigNodeList()) {
+ configNodeIds.add(configNodeLocation.getConfigNodeId());
+ }
+ }
+ return configNodeIds;
+ }
+
+ private static void assertAppliedConfiguration(int nodeId, String key,
String value)
+ throws Exception {
+ try (ITableSession tableSessionConnection =
EnvFactory.getEnv().getTableSessionConnection();
+ SessionDataSet sessionDataSet =
+ tableSessionConnection.executeQueryStatement("show configuration
on " + nodeId)) {
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ if (key.equals(iterator.getString(1))) {
+ Assert.assertEquals(value, iterator.isNull(2) ? null :
iterator.getString(2));
+ return;
+ }
+ }
+ }
+ Assert.fail("Cannot find applied configuration: " + key + " on node " +
nodeId);
+ }
+
+ private static void
assertSetConsistentClusterConfigurationOnSpecificNodeFailed(
+ Statement statement) throws SQLException {
+ try {
+ statement.execute("set configuration \"read_consistency_level\"=\"weak\"
on 0");
+ } catch (SQLException e) {
+ assertTrue(
+ e.getMessage()
+ .contains(
+ "must be consistent across the entire cluster and only one
can be set at a time"));
+ return;
+ }
+ Assert.fail("Set consistent cluster configuration on a specific node
should fail.");
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
index 4b569bf1010..e919a1c5a5f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.it;
import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -117,6 +120,201 @@ public class IoTDBSetConfigurationIT {
"topology_probing_timeout_ratio=0.4")));
}
+ @Test
+ public void testHotReloadHeartbeatInterval() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"heartbeat_interval_in_ms\"=\"500\"");
+ assertAppliedConfiguration(0, "heartbeat_interval_in_ms", "500");
+ assertAppliedConfiguration(
+ EnvFactory.getEnv().getConfigNodeWrapperList().size(),
"heartbeat_interval_in_ms", "500");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"heartbeat_interval_in_ms\"=\"1000\"");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ Assert.assertTrue(
+ EnvFactory.getEnv().getConfigNodeWrapperList().stream()
+ .allMatch(
+ nodeWrapper ->
+ checkConfigFileContains(nodeWrapper,
"heartbeat_interval_in_ms=1000")));
+ }
+
+ @Test
+ public void testRejectedHotReloadDoesNotUpdateAppliedConfiguration() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"heartbeat_interval_in_ms\"=\"500\"");
+ assertAppliedConfiguration(0, "heartbeat_interval_in_ms", "500");
+ assertAppliedConfiguration(
+ EnvFactory.getEnv().getConfigNodeWrapperList().size(),
"heartbeat_interval_in_ms", "500");
+
+ assertSetConfigurationFailed(
+ statement,
+ "set configuration \"heartbeat_interval_in_ms\"=\"-1\"",
+ "heartbeat_interval_in_ms should be greater than 0");
+ assertAppliedConfiguration(0, "heartbeat_interval_in_ms", "500");
+ assertAppliedConfiguration(
+ EnvFactory.getEnv().getConfigNodeWrapperList().size(),
"heartbeat_interval_in_ms", "500");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"heartbeat_interval_in_ms\"=\"1000\"");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ Assert.assertTrue(
+ EnvFactory.getEnv().getNodeWrapperList().stream()
+ .allMatch(
+ nodeWrapper ->
+ checkConfigFileContains(nodeWrapper,
"heartbeat_interval_in_ms=1000")));
+ }
+
+ @Test
+ public void testHotReloadContinuousQueryMinEveryInterval() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"continuous_query_min_every_interval_in_ms\"=\"50\"");
+ assertAppliedConfiguration(0,
"continuous_query_min_every_interval_in_ms", "50");
+ assertAppliedConfiguration(
+ EnvFactory.getEnv().getConfigNodeWrapperList().size(),
+ "continuous_query_min_every_interval_in_ms",
+ "50");
+ statement.execute(
+ "CREATE CQ hot_reload_cq\n"
+ + "RESAMPLE EVERY 50ms\n"
+ + "BEGIN \n"
+ + " SELECT count(s1) \n"
+ + " INTO root.sg_count.d(count_s1)\n"
+ + " FROM root.sg.d\n"
+ + " GROUP BY(30m)\n"
+ + "END");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute("DROP CQ hot_reload_cq");
+ } catch (SQLException ignored) {
+ // The CQ may not exist if the hot-reload assertion failed before
creation.
+ }
+ statement.execute(
+ "set configuration
\"continuous_query_min_every_interval_in_ms\"=\"1000\"");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ Assert.assertTrue(
+ EnvFactory.getEnv().getNodeWrapperList().stream()
+ .allMatch(
+ nodeWrapper ->
+ checkConfigFileContains(
+ nodeWrapper,
"continuous_query_min_every_interval_in_ms=1000")));
+ }
+
+ @Test
+ public void testHotReloadRegionGroupExtensionConfiguration() {
+ String database = "root.hot_reload_region";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"schema_region_group_extension_policy\"=\"CUSTOM\"");
+ statement.execute("set configuration
\"data_region_group_extension_policy\"=\"CUSTOM\"");
+ statement.execute("set configuration
\"default_schema_region_group_num_per_database\"=\"2\"");
+ statement.execute("set configuration
\"default_data_region_group_num_per_database\"=\"3\"");
+ statement.execute("set configuration
\"schema_region_per_data_node\"=\"2.0\"");
+ statement.execute("set configuration
\"data_region_per_data_node\"=\"2.0\"");
+
+ assertAppliedConfiguration(0, "schema_region_group_extension_policy",
"CUSTOM");
+ assertAppliedConfiguration(0, "data_region_group_extension_policy",
"CUSTOM");
+ assertAppliedConfiguration(0,
"default_schema_region_group_num_per_database", "2");
+ assertAppliedConfiguration(0,
"default_data_region_group_num_per_database", "3");
+ assertShowVariable(statement,
ColumnHeaderConstant.SCHEMA_REGION_PER_DATA_NODE, "2");
+ assertShowVariable(statement,
ColumnHeaderConstant.DATA_REGION_PER_DATA_NODE, "2");
+ assertSetConfigurationFailed(
+ statement,
+ "set configuration \"schema_region_per_data_node\"=\"1.9\"",
+ "schema_region_per_data_node should be an integer");
+ assertShowVariable(statement,
ColumnHeaderConstant.SCHEMA_REGION_PER_DATA_NODE, "2");
+ assertSetConfigurationFailed(
+ statement,
+ "set configuration \"data_region_per_data_node\"=\"1.9\"",
+ "data_region_per_data_node should be an integer");
+ assertShowVariable(statement,
ColumnHeaderConstant.DATA_REGION_PER_DATA_NODE, "2");
+
+ statement.execute("CREATE DATABASE " + database);
+ statement.execute("INSERT INTO " + database + ".d(timestamp, s1) VALUES
(1, 1)");
+ assertRegionGroupNum(statement, database, 2, 3);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute("DELETE DATABASE " + database);
+ } catch (SQLException ignored) {
+ // The database may not exist if the hot-reload assertion failed
before creation.
+ }
+ statement.execute("set configuration
\"schema_region_group_extension_policy\"=\"AUTO\"");
+ statement.execute("set configuration
\"data_region_group_extension_policy\"=\"AUTO\"");
+ statement.execute(
+ "set configuration
\"default_schema_region_group_num_per_database\"=\"1\"");
+ statement.execute("set configuration
\"default_data_region_group_num_per_database\"=\"2\"");
+ statement.execute("set configuration
\"schema_region_per_data_node\"=\"1\"");
+ statement.execute("set configuration
\"data_region_per_data_node\"=\"0\"");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ Assert.assertTrue(
+ EnvFactory.getEnv().getConfigNodeWrapperList().stream()
+ .allMatch(
+ nodeWrapper ->
+ checkConfigFileContains(
+ nodeWrapper,
+ "schema_region_group_extension_policy=AUTO",
+ "data_region_group_extension_policy=AUTO",
+ "default_schema_region_group_num_per_database=1",
+ "default_data_region_group_num_per_database=2",
+ "schema_region_per_data_node=1",
+ "data_region_per_data_node=0")));
+ }
+
+ @Test
+ public void testHotReloadReadConsistencyLevel() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ assertSetConsistentClusterConfigurationOnSpecificNodeFailed(statement);
+ statement.execute("set configuration
\"read_consistency_level\"=\"weak\"");
+ assertAppliedConfiguration(0, "read_consistency_level", "weak");
+ assertAppliedConfiguration(
+ EnvFactory.getEnv().getConfigNodeWrapperList().size(),
"read_consistency_level", "weak");
+ assertShowVariable(statement,
ColumnHeaderConstant.READ_CONSISTENCY_LEVEL, "weak");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("set configuration
\"read_consistency_level\"=\"strong\"");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ Assert.assertTrue(
+ EnvFactory.getEnv().getNodeWrapperList().stream()
+ .allMatch(
+ nodeWrapper ->
+ checkConfigFileContains(nodeWrapper,
"read_consistency_level=strong")));
+ }
+
@Test
public void testSetClusterName() throws Exception {
// set cluster name on cn and dn
@@ -177,6 +375,82 @@ public class IoTDBSetConfigurationIT {
}
}
+ private static void assertAppliedConfiguration(int nodeId, String key,
String value)
+ throws Exception {
+ try (ITableSession tableSessionConnection =
EnvFactory.getEnv().getTableSessionConnection();
+ SessionDataSet sessionDataSet =
+ tableSessionConnection.executeQueryStatement("show configuration
on " + nodeId)) {
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ if (key.equals(iterator.getString(1))) {
+ Assert.assertEquals(value, iterator.isNull(2) ? null :
iterator.getString(2));
+ return;
+ }
+ }
+ }
+ Assert.fail("Cannot find applied configuration: " + key);
+ }
+
+ private static void
assertSetConsistentClusterConfigurationOnSpecificNodeFailed(
+ Statement statement) throws SQLException {
+ try {
+ statement.execute("set configuration \"read_consistency_level\"=\"weak\"
on 0");
+ } catch (SQLException e) {
+ assertTrue(
+ e.getMessage()
+ .contains(
+ "must be consistent across the entire cluster and only one
can be set at a time"));
+ return;
+ }
+ Assert.fail("Set consistent cluster configuration on a specific node
should fail.");
+ }
+
+ private static void assertSetConfigurationFailed(
+ Statement statement, String sql, String expectedMessage) throws
SQLException {
+ try {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ assertTrue(e.getMessage().contains(expectedMessage));
+ return;
+ }
+ Assert.fail("Set configuration should fail: " + sql);
+ }
+
+ private static void assertShowVariable(Statement statement, String key,
String value)
+ throws SQLException {
+ try (ResultSet resultSet = statement.executeQuery("show variables")) {
+ while (resultSet.next()) {
+ if (key.equals(resultSet.getString(1))) {
+ Assert.assertEquals(value, resultSet.getString(2));
+ return;
+ }
+ }
+ }
+ Assert.fail("Cannot find variable: " + key);
+ }
+
+ private static void assertRegionGroupNum(
+ Statement statement,
+ String database,
+ int expectedSchemaRegionGroupNum,
+ int expectedDataRegionGroupNum)
+ throws SQLException {
+ int schemaRegionGroupNum = 0;
+ int dataRegionGroupNum = 0;
+ try (ResultSet resultSet = statement.executeQuery("show regions of
database " + database)) {
+ while (resultSet.next()) {
+ String regionType = resultSet.getString(ColumnHeaderConstant.TYPE);
+ if ("SchemaRegion".equals(regionType)) {
+ schemaRegionGroupNum++;
+ } else if ("DataRegion".equals(regionType)) {
+ dataRegionGroupNum++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedSchemaRegionGroupNum, schemaRegionGroupNum);
+ Assert.assertEquals(expectedDataRegionGroupNum, dataRegionGroupNum);
+ }
+
@Test
public void testSetDefaultSGLevel() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 326d8b43ceb..d79ad04b4d6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -95,7 +95,7 @@ public class ConfigNodeConfig {
private String dataPartitionAllocationStrategy = "INHERIT";
/** The policy of extension SchemaRegionGroup for each Database. */
- private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy =
+ private volatile RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy
=
RegionGroupExtensionPolicy.AUTO;
/**
@@ -103,13 +103,13 @@ public class ConfigNodeConfig {
* SchemaRegionGroups for each Database. When set
schema_region_group_extension_policy=AUTO, this
* parameter is the default minimal number of SchemaRegionGroups for each
Database.
*/
- private int defaultSchemaRegionGroupNumPerDatabase = 1;
+ private volatile int defaultSchemaRegionGroupNumPerDatabase = 1;
/** The maximum number of SchemaRegions expected to be managed by each
DataNode. */
- private int schemaRegionPerDataNode = 1;
+ private volatile int schemaRegionPerDataNode = 1;
/** The policy of extension DataRegionGroup for each Database. */
- private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
+ private volatile RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
RegionGroupExtensionPolicy.AUTO;
/**
@@ -117,13 +117,13 @@ public class ConfigNodeConfig {
* DataRegionGroups for each Database. When set
data_region_group_extension_policy=AUTO, this
* parameter is the default minimal number of DataRegionGroups for each
Database.
*/
- private int defaultDataRegionGroupNumPerDatabase = 2;
+ private volatile int defaultDataRegionGroupNumPerDatabase = 2;
/**
* The maximum number of DataRegions expected to be managed by each
DataNode. Set to 0 means that
* each dataNode automatically has the number of CPU cores / 2 regions.
*/
- private int dataRegionPerDataNode = 0;
+ private volatile int dataRegionPerDataNode = 0;
/** each dataNode automatically has the number of CPU cores / 2 regions. */
private final double dataRegionPerDataNodeProportion = 0.5;
@@ -202,7 +202,10 @@ public class ConfigNodeConfig {
Math.max(Runtime.getRuntime().availableProcessors() / 4, 16);
/** The heartbeat interval in milliseconds. */
- private long heartbeatIntervalInMs = 1000;
+ private volatile long heartbeatIntervalInMs = 1000;
+
+ /** Startup heartbeat interval used to initialize failure detectors. */
+ private long failureDetectorHeartbeatIntervalInMs = heartbeatIntervalInMs;
/** Failure detector implementation */
private String failureDetector = IFailureDetector.PHI_ACCRUAL_DETECTOR;
@@ -234,7 +237,7 @@ public class ConfigNodeConfig {
/** The route priority policy of cluster read/write requests. */
private String routePriorityPolicy = IPriorityBalancer.LEADER_POLICY;
- private String readConsistencyLevel = "strong";
+ private volatile String readConsistencyLevel = "strong";
/** RatisConsensus protocol, Max size for a single log append request from
leader. */
private long dataRegionRatisConsensusLogAppenderBufferSize = 16 * 1024 *
1024L;
@@ -293,7 +296,7 @@ public class ConfigNodeConfig {
/** CQ related. */
private int cqSubmitThread = 2;
- private long cqMinEveryIntervalInMs = 1_000;
+ private volatile long cqMinEveryIntervalInMs = 1_000;
/** RatisConsensus protocol, request timeout for ratis client. */
private long dataRegionRatisRequestTimeoutMs = 10000L;
@@ -732,6 +735,14 @@ public class ConfigNodeConfig {
this.heartbeatIntervalInMs = heartbeatIntervalInMs;
}
+ public long getFailureDetectorHeartbeatIntervalInMs() {
+ return failureDetectorHeartbeatIntervalInMs;
+ }
+
+ public void setFailureDetectorHeartbeatIntervalInMs(long
failureDetectorHeartbeatIntervalInMs) {
+ this.failureDetectorHeartbeatIntervalInMs =
failureDetectorHeartbeatIntervalInMs;
+ }
+
public String getLeaderDistributionPolicy() {
return leaderDistributionPolicy;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1245bef7f3d..43996574723 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -45,6 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
@@ -214,42 +215,7 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_replication_factor",
String.valueOf(conf.getDataReplicationFactor()))));
- conf.setSchemaRegionGroupExtensionPolicy(
- RegionGroupExtensionPolicy.parse(
- properties.getProperty(
- "schema_region_group_extension_policy",
- conf.getSchemaRegionGroupExtensionPolicy().getPolicy())));
-
- conf.setDefaultSchemaRegionGroupNumPerDatabase(
- Integer.parseInt(
- properties.getProperty(
- "default_schema_region_group_num_per_database",
-
String.valueOf(conf.getDefaultSchemaRegionGroupNumPerDatabase()))));
-
- conf.setSchemaRegionPerDataNode(
- (int)
- Double.parseDouble(
- properties.getProperty(
- "schema_region_per_data_node",
- String.valueOf(conf.getSchemaRegionPerDataNode()))));
-
- conf.setDataRegionGroupExtensionPolicy(
- RegionGroupExtensionPolicy.parse(
- properties.getProperty(
- "data_region_group_extension_policy",
- conf.getDataRegionGroupExtensionPolicy().getPolicy())));
-
- conf.setDefaultDataRegionGroupNumPerDatabase(
- Integer.parseInt(
- properties.getProperty(
- "default_data_region_group_num_per_database",
-
String.valueOf(conf.getDefaultDataRegionGroupNumPerDatabase()))));
-
- conf.setDataRegionPerDataNode(
- (int)
- Double.parseDouble(
- properties.getProperty(
- "data_region_per_data_node",
String.valueOf(conf.getDataRegionPerDataNode()))));
+ loadRegionGroupExtensionConfig(properties);
try {
conf.setRegionAllocateStrategy(
@@ -307,10 +273,8 @@ public class ConfigNodeDescriptor {
"pipe_receiver_file_dir",
conf.getSystemDir() + File.separator + "pipe" +
File.separator + "receiver")));
- conf.setHeartbeatIntervalInMs(
- Long.parseLong(
- properties.getProperty(
- "heartbeat_interval_in_ms",
String.valueOf(conf.getHeartbeatIntervalInMs()))));
+ conf.setHeartbeatIntervalInMs(loadHeartbeatIntervalInMs(properties));
+
conf.setFailureDetectorHeartbeatIntervalInMs(conf.getHeartbeatIntervalInMs());
String failureDetector = properties.getProperty("failure_detector",
conf.getFailureDetector());
if (IFailureDetector.FIXED_DETECTOR.equals(failureDetector)
@@ -397,16 +361,7 @@ public class ConfigNodeDescriptor {
ConfigNodeMessages.UNKNOWN_ROUTE_PRIORITY_POLICY_PLEASE_SET_TO,
routePriorityPolicy));
}
- String readConsistencyLevel =
- properties.getProperty("read_consistency_level",
conf.getReadConsistencyLevel());
- if (readConsistencyLevel.equals("strong") ||
readConsistencyLevel.equals("weak")) {
- conf.setReadConsistencyLevel(readConsistencyLevel);
- } else {
- throw new IOException(
- String.format(
- ConfigNodeMessages.UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO,
- readConsistencyLevel));
- }
+ conf.setReadConsistencyLevel(loadReadConsistencyLevel(properties));
// commons
commonDescriptor.loadCommonProps(properties);
@@ -768,7 +723,7 @@ public class ConfigNodeDescriptor {
String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs()))));
}
- private void loadCQConfig(TrimProperties properties) {
+ private void loadCQConfig(TrimProperties properties) throws IOException {
int cqSubmitThread =
Integer.parseInt(
properties.getProperty(
@@ -782,6 +737,27 @@ public class ConfigNodeDescriptor {
}
conf.setCqSubmitThread(cqSubmitThread);
+ conf.setCqMinEveryIntervalInMs(loadCqMinEveryIntervalInMs(properties));
+ }
+
+ private long loadHeartbeatIntervalInMs(TrimProperties properties) {
+ return Long.parseLong(
+ properties.getProperty(
+ "heartbeat_interval_in_ms",
String.valueOf(conf.getHeartbeatIntervalInMs())));
+ }
+
+ private long loadHotReloadHeartbeatIntervalInMs(TrimProperties properties)
throws IOException {
+ long heartbeatIntervalInMs = loadHeartbeatIntervalInMs(properties);
+ if (heartbeatIntervalInMs <= 0) {
+ throw new IOException(
+ "heartbeat_interval_in_ms should be greater than 0, but was "
+ + heartbeatIntervalInMs
+ + ".");
+ }
+ return heartbeatIntervalInMs;
+ }
+
+ private long loadCqMinEveryIntervalInMs(TrimProperties properties) {
long cqMinEveryIntervalInMs =
Long.parseLong(
properties.getProperty(
@@ -795,7 +771,94 @@ public class ConfigNodeDescriptor {
cqMinEveryIntervalInMs = conf.getCqMinEveryIntervalInMs();
}
- conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs);
+ return cqMinEveryIntervalInMs;
+ }
+
+ private long loadHotReloadCqMinEveryIntervalInMs(TrimProperties properties)
throws IOException {
+ long cqMinEveryIntervalInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "continuous_query_min_every_interval_in_ms",
+ String.valueOf(conf.getCqMinEveryIntervalInMs())));
+ if (cqMinEveryIntervalInMs <= 0) {
+ throw new IOException(
+ "continuous_query_min_every_interval_in_ms should be greater than 0,
but current value is "
+ + cqMinEveryIntervalInMs
+ + ".");
+ }
+
+ return cqMinEveryIntervalInMs;
+ }
+
+ private void loadRegionGroupExtensionConfig(TrimProperties properties)
throws IOException {
+ RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy =
+ RegionGroupExtensionPolicy.parse(
+ properties.getProperty(
+ "schema_region_group_extension_policy",
+ conf.getSchemaRegionGroupExtensionPolicy().getPolicy()));
+ int defaultSchemaRegionGroupNumPerDatabase =
+ Integer.parseInt(
+ properties.getProperty(
+ "default_schema_region_group_num_per_database",
+
String.valueOf(conf.getDefaultSchemaRegionGroupNumPerDatabase())));
+ int schemaRegionPerDataNode =
+ parseIntegerCompatibleValue(
+ properties, "schema_region_per_data_node",
conf.getSchemaRegionPerDataNode());
+ RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
+ RegionGroupExtensionPolicy.parse(
+ properties.getProperty(
+ "data_region_group_extension_policy",
+ conf.getDataRegionGroupExtensionPolicy().getPolicy()));
+ int defaultDataRegionGroupNumPerDatabase =
+ Integer.parseInt(
+ properties.getProperty(
+ "default_data_region_group_num_per_database",
+
String.valueOf(conf.getDefaultDataRegionGroupNumPerDatabase())));
+ int dataRegionPerDataNode =
+ parseIntegerCompatibleValue(
+ properties, "data_region_per_data_node",
conf.getDataRegionPerDataNode());
+
+ if (defaultSchemaRegionGroupNumPerDatabase <= 0) {
+ throw new IOException("default_schema_region_group_num_per_database
should be positive.");
+ }
+ if (schemaRegionPerDataNode <= 0) {
+ throw new IOException("schema_region_per_data_node should be positive.");
+ }
+ if (defaultDataRegionGroupNumPerDatabase <= 0) {
+ throw new IOException("default_data_region_group_num_per_database should
be positive.");
+ }
+ if (dataRegionPerDataNode < 0) {
+ throw new IOException("data_region_per_data_node should be
non-negative.");
+ }
+
+ conf.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy);
+
conf.setDefaultSchemaRegionGroupNumPerDatabase(defaultSchemaRegionGroupNumPerDatabase);
+ conf.setSchemaRegionPerDataNode(schemaRegionPerDataNode);
+ conf.setDataRegionGroupExtensionPolicy(dataRegionGroupExtensionPolicy);
+
conf.setDefaultDataRegionGroupNumPerDatabase(defaultDataRegionGroupNumPerDatabase);
+ conf.setDataRegionPerDataNode(dataRegionPerDataNode);
+ }
+
+ private int parseIntegerCompatibleValue(
+ TrimProperties properties, String propertyName, int defaultValue) throws
IOException {
+ String propertyValue = properties.getProperty(propertyName,
String.valueOf(defaultValue));
+ try {
+ return new
BigDecimal(propertyValue).stripTrailingZeros().intValueExact();
+ } catch (ArithmeticException | NumberFormatException e) {
+ throw new IOException(
+ propertyName + " should be an integer, but was " + propertyValue +
".", e);
+ }
+ }
+
+ private String loadReadConsistencyLevel(TrimProperties properties) throws
IOException {
+ String readConsistencyLevel =
+ properties.getProperty("read_consistency_level",
conf.getReadConsistencyLevel());
+ if (readConsistencyLevel.equals("strong") ||
readConsistencyLevel.equals("weak")) {
+ return readConsistencyLevel;
+ }
+ throw new IOException(
+ String.format(
+ ConfigNodeMessages.UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO,
readConsistencyLevel));
}
/**
@@ -823,12 +886,19 @@ public class ConfigNodeDescriptor {
}
public void loadHotModifiedProps(TrimProperties properties) throws
IOException {
- ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
+ long heartbeatIntervalInMs =
loadHotReloadHeartbeatIntervalInMs(properties);
+ long cqMinEveryIntervalInMs =
loadHotReloadCqMinEveryIntervalInMs(properties);
+ String readConsistencyLevel = loadReadConsistencyLevel(properties);
+ loadRegionGroupExtensionConfig(properties);
+ conf.setHeartbeatIntervalInMs(heartbeatIntervalInMs);
+ conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs);
+ conf.setReadConsistencyLevel(readConsistencyLevel);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v ->
conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
loadPipeHotModifiedProp(properties);
+ ConfigurationFileUtils.updateAppliedProperties(properties, true);
}
private void loadPipeHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 37dd66b1527..92a22fe992f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1750,18 +1750,28 @@ public class ConfigManager implements IManager {
public TSStatus setConfiguration(TSetConfigurationReq req) {
TSStatus tsStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
int currentNodeId = CONF.getConfigNodeId();
- if (currentNodeId != req.getNodeId()) {
+ TSStatus consistentClusterConfigStatus =
+ checkConsistentClusterConfigSetConfigurationTarget(req);
+ if (consistentClusterConfigStatus != null) {
+ return consistentClusterConfigStatus;
+ }
+ if (currentNodeId != req.getNodeId() && req.getNodeId() !=
NodeManager.APPLY_CONFIG_LOCALLY) {
tsStatus = confirmLeader();
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return tsStatus;
}
}
- if (currentNodeId == req.getNodeId() || req.getNodeId() < 0) {
+ if (currentNodeId == req.getNodeId()
+ || req.getNodeId() < 0
+ || req.getNodeId() == NodeManager.APPLY_CONFIG_LOCALLY) {
URL url =
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
boolean configurationFileFound = (url != null && new
File(url.getFile()).exists());
TrimProperties properties = new TrimProperties();
properties.putAll(req.getConfigs());
+ long previousHeartbeatIntervalInMs = CONF.getHeartbeatIntervalInMs();
+ int previousSchemaRegionPerDataNode = CONF.getSchemaRegionPerDataNode();
+ int previousDataRegionPerDataNode = CONF.getDataRegionPerDataNode();
boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing();
if (configurationFileFound) {
File file = new File(url.getFile());
@@ -1786,8 +1796,14 @@ public class ConfigManager implements IManager {
}
LOGGER.warn(msg);
}
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return tsStatus;
+ }
+ handleHeartbeatIntervalHotReload(previousHeartbeatIntervalInMs);
+ handleRegionPerDataNodeHotReload(
+ previousSchemaRegionPerDataNode, previousDataRegionPerDataNode);
handleTopologyProbingHotReload(wasTopologyProbingEnabled);
- if (currentNodeId == req.getNodeId()) {
+ if (currentNodeId == req.getNodeId() || req.getNodeId() ==
NodeManager.APPLY_CONFIG_LOCALLY) {
return tsStatus;
}
}
@@ -1798,6 +1814,50 @@ public class ConfigManager implements IManager {
return RpcUtils.squashResponseStatusList(statusList);
}
+ private TSStatus
checkConsistentClusterConfigSetConfigurationTarget(TSetConfigurationReq req) {
+ if (req.getNodeId() == NodeManager.APPLY_CONFIG_LOCALLY) {
+ if (getConsensusManager() != null && !getConsensusManager().isLeader()) {
+ return null;
+ }
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ "The internal configuration application target is invalid.");
+ }
+ if (req.getNodeId() < 0) {
+ return null;
+ }
+ for (String configKey : req.getConfigs().keySet()) {
+ if
(ConfigurationFileUtils.parameterNeedKeepConsistentInCluster(configKey)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ "The parameter '"
+ + configKey
+ + "' must be consistent across the entire cluster and cannot
be set on a specific node.");
+ }
+ }
+ return null;
+ }
+
+ private void handleHeartbeatIntervalHotReload(long
previousHeartbeatIntervalInMs) {
+ if (previousHeartbeatIntervalInMs == CONF.getHeartbeatIntervalInMs()) {
+ return;
+ }
+ getLoadManager().reloadHeartbeatInterval();
+ getRetryFailedTasksThread().reloadHeartbeatInterval();
+ }
+
+ private void handleRegionPerDataNodeHotReload(
+ int previousSchemaRegionPerDataNode, int previousDataRegionPerDataNode) {
+ if (previousSchemaRegionPerDataNode == CONF.getSchemaRegionPerDataNode()
+ && previousDataRegionPerDataNode == CONF.getDataRegionPerDataNode()) {
+ return;
+ }
+ if (!getConsensusManager().isLeader()) {
+ return;
+ }
+ getClusterSchemaManager().adjustMaxRegionGroupNum();
+ }
+
private void handleTopologyProbingHotReload(boolean wasEnabled) {
boolean isEnabled = CONF.isEnableTopologyProbing();
if (wasEnabled == isEnabled) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 748d6378462..2f222a93ac6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.load.LoadManager;
@@ -53,8 +52,6 @@ public class RetryFailedTasksThread {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetryFailedTasksThread.class);
- private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
- private static final long HEARTBEAT_INTERVAL =
CONF.getHeartbeatIntervalInMs();
private final IManager configManager;
private final NodeManager nodeManager;
private final LoadManager loadManager;
@@ -82,7 +79,7 @@ public class RetryFailedTasksThread {
retryFailTasksExecutor,
this::retryFailedTasks,
0,
- HEARTBEAT_INTERVAL,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
TimeUnit.MILLISECONDS);
LOGGER.info(ManagerMessages.RETRYFAILMISSIONS_SERVICE_IS_STARTED_SUCCESSFULLY);
}
@@ -100,6 +97,24 @@ public class RetryFailedTasksThread {
}
}
+ /** Reload the retry interval without rebuilding the service instance. */
+ public void reloadHeartbeatInterval() {
+ synchronized (scheduleMonitor) {
+ if (currentFailedTasksRetryThreadFuture == null) {
+ return;
+ }
+ currentFailedTasksRetryThreadFuture.cancel(false);
+ currentFailedTasksRetryThreadFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ retryFailTasksExecutor,
+ this::retryFailedTasks,
+ 0,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
+ TimeUnit.MILLISECONDS);
+
LOGGER.info(ManagerMessages.RETRYFAILMISSIONS_SERVICE_IS_STARTED_SUCCESSFULLY);
+ }
+ }
+
private void retryFailedTasks() {
// trigger
triggerDetectTask();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 63d5ff8befd..5a424ea5a5e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -186,6 +186,12 @@ public class LoadManager {
routeBalancer.clearRegionPriority();
}
+ public void reloadHeartbeatInterval() {
+ heartbeatService.reloadHeartbeatInterval();
+ statisticsService.reloadHeartbeatInterval();
+ eventService.reloadHeartbeatInterval();
+ }
+
public boolean isLoadReady() {
return loadReady.get() || tryUpdateLoadReady();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index e5ab6445f98..4c5ee322aa1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -56,7 +56,7 @@ public abstract class AbstractLoadCache {
new PhiAccrualDetector(
CONF.getFailureDetectorPhiThreshold(),
CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
- CONF.getHeartbeatIntervalInMs() * 200_000L,
+ CONF.getFailureDetectorHeartbeatIntervalInMs() * 200_000L,
IFailureDetector.PHI_COLD_START_THRESHOLD,
new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs()
* 1000_000L));
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
index 762e6b1782b..5974e5ceb58 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
@@ -57,9 +57,6 @@ public class EventService {
private static final Logger LOGGER =
LoggerFactory.getLogger(EventService.class);
- private static final long HEARTBEAT_INTERVAL =
- ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
-
// Event executor service
private final Object eventServiceMonitor = new Object();
@@ -101,7 +98,7 @@ public class EventService {
eventServiceExecutor,
this::broadcastChangeEventIfNecessary,
0,
- HEARTBEAT_INTERVAL,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
TimeUnit.MILLISECONDS);
LOGGER.info(ManagerMessages.EVENT_SERVICE_IS_STARTED_SUCCESSFULLY);
}
@@ -124,6 +121,24 @@ public class EventService {
}
}
+ /** Reload the event-check interval without rebuilding the service instance.
*/
+ public void reloadHeartbeatInterval() {
+ synchronized (eventServiceMonitor) {
+ if (currentEventServiceFuture == null) {
+ return;
+ }
+ currentEventServiceFuture.cancel(false);
+ currentEventServiceFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ eventServiceExecutor,
+ this::broadcastChangeEventIfNecessary,
+ 0,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
+ TimeUnit.MILLISECONDS);
+ LOGGER.info(ManagerMessages.EVENT_SERVICE_IS_STARTED_SUCCESSFULLY);
+ }
+ }
+
private void broadcastChangeEventIfNecessary() {
checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index a491b3960c3..239a68801e2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -66,9 +66,6 @@ public class HeartbeatService {
private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatService.class);
- private static final long HEARTBEAT_INTERVAL =
- ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
-
protected IManager configManager;
private final LoadCache loadCache;
@@ -101,7 +98,7 @@ public class HeartbeatService {
heartBeatExecutor,
this::heartbeatLoopBody,
0,
- HEARTBEAT_INTERVAL,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
TimeUnit.MILLISECONDS);
LOGGER.info(ManagerMessages.HEARTBEAT_SERVICE_IS_STARTED_SUCCESSFULLY);
}
@@ -119,6 +116,24 @@ public class HeartbeatService {
}
}
+ /** Reload the heartbeat interval without rebuilding the service instance. */
+ public void reloadHeartbeatInterval() {
+ synchronized (heartbeatScheduleMonitor) {
+ if (currentHeartbeatFuture == null) {
+ return;
+ }
+ currentHeartbeatFuture.cancel(false);
+ currentHeartbeatFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ heartBeatExecutor,
+ this::heartbeatLoopBody,
+ 0,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
+ TimeUnit.MILLISECONDS);
+ LOGGER.info(ManagerMessages.HEARTBEAT_SERVICE_IS_STARTED_SUCCESSFULLY);
+ }
+ }
+
/** loop body of the heartbeat thread. */
private void heartbeatLoopBody() {
// The consensusManager of configManager may not be fully initialized at
this time
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
index 9fb235eed81..620276767f0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -38,9 +38,6 @@ public class StatisticsService {
private static final Logger LOGGER =
LoggerFactory.getLogger(StatisticsService.class);
- public static final long STATISTICS_UPDATE_INTERVAL =
- ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
-
private final LoadCache loadCache;
public StatisticsService(LoadCache loadCache) {
@@ -64,7 +61,7 @@ public class StatisticsService {
loadStatisticsExecutor,
this::updateLoadStatistics,
0,
- STATISTICS_UPDATE_INTERVAL,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
TimeUnit.MILLISECONDS);
LOGGER.info(ManagerMessages.LOADSTATISTICS_SERVICE_IS_STARTED_SUCCESSFULLY);
}
@@ -82,6 +79,24 @@ public class StatisticsService {
}
}
+ /** Reload the statistics update interval without rebuilding the service
instance. */
+ public void reloadHeartbeatInterval() {
+ synchronized (statisticsScheduleMonitor) {
+ if (currentLoadStatisticsFuture == null) {
+ return;
+ }
+ currentLoadStatisticsFuture.cancel(false);
+ currentLoadStatisticsFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ loadStatisticsExecutor,
+ this::updateLoadStatistics,
+ 0,
+
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(),
+ TimeUnit.MILLISECONDS);
+
LOGGER.info(ManagerMessages.LOADSTATISTICS_SERVICE_IS_STARTED_SUCCESSFULLY);
+ }
+ }
+
private void updateLoadStatistics() {
loadCache.updateNodeStatistics(false);
loadCache.updateRegionGroupStatistics();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index b99cbd00017..bbeb61aa631 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -115,7 +115,7 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
new PhiAccrualDetector(
CONF.getFailureDetectorPhiThreshold(),
CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
- CONF.getHeartbeatIntervalInMs() * 200_000L,
+ CONF.getFailureDetectorHeartbeatIntervalInMs() * 200_000L,
IFailureDetector.PHI_COLD_START_THRESHOLD,
new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs()
* 1000_000L));
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 4479fb77711..9a27f8f07ac 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -142,6 +142,8 @@ public class NodeManager {
private static final String CONSENSUS_WRITE_ERROR =
"Failed in the write API executing the consensus layer due to: ";
+ public static final int APPLY_CONFIG_LOCALLY = -2;
+
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
@@ -1083,7 +1085,7 @@ public class NodeManager {
SyncConfigNodeClientPool.getInstance()
.sendSyncRequestToConfigNodeWithRetry(
configNode.getInternalEndPoint(),
- new TSetConfigurationReq(req.getConfigs(),
configNode.getConfigNodeId()),
+ new TSetConfigurationReq(req.getConfigs(),
APPLY_CONFIG_LOCALLY),
CnToCnNodeRequestType.SET_CONFIGURATION);
} catch (Exception e) {
status =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index dfa3448bc0f..24b2c5d946c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -131,10 +131,6 @@ public class PartitionManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionManager.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
- private static final RegionGroupExtensionPolicy
SCHEMA_REGION_GROUP_EXTENSION_POLICY =
- CONF.getSchemaRegionGroupExtensionPolicy();
- private static final RegionGroupExtensionPolicy
DATA_REGION_GROUP_EXTENSION_POLICY =
- CONF.getDataRegionGroupExtensionPolicy();
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
private final IManager configManager;
@@ -582,7 +578,7 @@ public class PartitionManager {
try {
if (TConsensusGroupType.SchemaRegion.equals(consensusGroupType)) {
- switch (SCHEMA_REGION_GROUP_EXTENSION_POLICY) {
+ switch (CONF.getSchemaRegionGroupExtensionPolicy()) {
case CUSTOM:
return customExtendRegionGroupIfNecessary(
unassignedPartitionSlotsCountMap, consensusGroupType);
@@ -592,7 +588,7 @@ public class PartitionManager {
unassignedPartitionSlotsCountMap, consensusGroupType);
}
} else {
- switch (DATA_REGION_GROUP_EXTENSION_POLICY) {
+ switch (CONF.getDataRegionGroupExtensionPolicy()) {
case CUSTOM:
return customExtendRegionGroupIfNecessary(
unassignedPartitionSlotsCountMap, consensusGroupType);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index fbf62d8f1dd..1c7ce1e74db 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -141,8 +141,6 @@ public class ClusterSchemaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterSchemaManager.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
- private static final int SCHEMA_REGION_PER_DATA_NODE =
CONF.getSchemaRegionPerDataNode();
- private static final int DATA_REGION_PER_DATA_NODE =
CONF.getDataRegionPerDataNode();
private final IManager configManager;
private final ClusterSchemaInfo clusterSchemaInfo;
@@ -523,7 +521,7 @@ public class ClusterSchemaManager {
final int maxSchemaRegionGroupNum =
calcMaxRegionGroupNum(
databaseSchema.getMinSchemaRegionGroupNum(),
- SCHEMA_REGION_PER_DATA_NODE,
+ CONF.getSchemaRegionPerDataNode(),
dataNodeNum,
databaseNum,
databaseSchema.getSchemaReplicationFactor(),
@@ -549,10 +547,10 @@ public class ClusterSchemaManager {
final int maxDataRegionGroupNum =
calcMaxRegionGroupNum(
databaseSchema.getMinDataRegionGroupNum(),
- DATA_REGION_PER_DATA_NODE == 0
+ CONF.getDataRegionPerDataNode() == 0
? CONF.getDataRegionPerDataNodeProportion()
- : DATA_REGION_PER_DATA_NODE,
- DATA_REGION_PER_DATA_NODE == 0 ? totalCpuCoreNum : dataNodeNum,
+ : CONF.getDataRegionPerDataNode(),
+ CONF.getDataRegionPerDataNode() == 0 ? totalCpuCoreNum :
dataNodeNum,
databaseNum,
databaseSchema.getDataReplicationFactor(),
allocatedDataRegionGroupCount);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7aadfebc917..c20122bcf50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -727,7 +727,7 @@ public class IoTDBConfig {
* Minimum every interval to perform continuous query.
* The every interval of continuous query instances should not be lower than
this limit.
*/
- private long continuousQueryMinimumEveryInterval = 1000;
+ private volatile long continuousQueryMinimumEveryInterval = 1000;
/** How much memory may be used in ONE SELECT INTO operation (in Byte). */
private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L;
@@ -1018,7 +1018,7 @@ public class IoTDBConfig {
private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L;
private int schemaThreadCount = 5;
- private ReadConsistencyLevel readConsistencyLevel =
ReadConsistencyLevel.STRONG;
+ private volatile ReadConsistencyLevel readConsistencyLevel =
ReadConsistencyLevel.STRONG;
/** Maximum size of wal buffer used in IoTConsensus. Unit: byte */
private long throttleThreshold = 200 * 1024 * 1024 * 1024L;
@@ -1055,7 +1055,7 @@ public class IoTDBConfig {
private long schemaRatisConsensusLeaderElectionTimeoutMaxMs = 4000L;
/** CQ related */
- private long cqMinEveryIntervalInMs = 1_000;
+ private volatile long cqMinEveryIntervalInMs = 1_000;
private long dataRatisConsensusRequestTimeoutMs = 10000L;
private long schemaRatisConsensusRequestTimeoutMs = 10000L;
@@ -3795,6 +3795,7 @@ public class IoTDBConfig {
public void setCqMinEveryIntervalInMs(long cqMinEveryIntervalInMs) {
this.cqMinEveryIntervalInMs = cqMinEveryIntervalInMs;
+ this.continuousQueryMinimumEveryInterval = cqMinEveryIntervalInMs;
}
public double getUsableCompactionMemoryProportion() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 13352020a15..59878f68e0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2106,7 +2106,6 @@ public class IoTDBDescriptor {
public synchronized void loadHotModifiedProps(TrimProperties properties)
throws QueryProcessException, IOException {
- ConfigurationFileUtils.updateAppliedProperties(properties, true);
try {
// update data dirs
String dataDirs = properties.getProperty("dn_data_dirs", null);
@@ -2196,6 +2195,12 @@ public class IoTDBDescriptor {
// update load config
loadLoadTsFileHotModifiedProp(properties);
+ // update CQ semantic-check config pushed from ConfigNode
+ loadCqMinEveryIntervalInMs(properties);
+
+ // update query routing consistency config pushed from ConfigNode
+ loadReadConsistencyLevel(properties);
+
// update pipe config
loadPipeHotModifiedProp(properties);
@@ -2320,6 +2325,7 @@ public class IoTDBDescriptor {
conf.setMaxSubTaskNumForInformationTableScan(maxSubTaskNumForInformationTableScan);
}
+ ConfigurationFileUtils.updateAppliedProperties(properties, true);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -2820,6 +2826,34 @@ public class IoTDBDescriptor {
false));
}
+ private void loadCqMinEveryIntervalInMs(TrimProperties properties) throws
IOException {
+ long cqMinEveryIntervalInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "continuous_query_min_every_interval_in_ms",
+ String.valueOf(conf.getCqMinEveryIntervalInMs())));
+ if (cqMinEveryIntervalInMs <= 0) {
+ throw new IOException(
+ "continuous_query_min_every_interval_in_ms should be greater than 0,
but current value is "
+ + cqMinEveryIntervalInMs
+ + ".");
+ }
+ conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs);
+ }
+
+ private void loadReadConsistencyLevel(TrimProperties properties) throws
IOException {
+ String readConsistencyLevel =
+ properties.getProperty(
+ "read_consistency_level",
conf.getReadConsistencyLevel().name().toLowerCase());
+ if (!"strong".equals(readConsistencyLevel) &&
!"weak".equals(readConsistencyLevel)) {
+ throw new IOException(
+ String.format(
+ "Unknown read_consistency_level: %s, please set to \"strong\" or
\"weak\"",
+ readConsistencyLevel));
+ }
+ conf.setReadConsistencyLevel(readConsistencyLevel);
+ }
+
public void loadClusterProps(TrimProperties properties) throws IOException {
String configNodeUrls =
properties.getProperty(IoTDBConstant.DN_SEED_CONFIG_NODE);
if (configNodeUrls == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index 0fddb41d6e6..4fa4bbb0ac2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -54,13 +54,11 @@ public abstract class AbstractFragmentParallelPlanner
implements IFragmentParall
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final ReadConsistencyLevel readConsistencyLevel;
protected final MPPQueryContext queryContext;
protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) {
this.queryContext = queryContext;
- this.readConsistencyLevel = CONFIG.getReadConsistencyLevel();
}
protected void selectExecutorAndHost(
@@ -117,7 +115,7 @@ public abstract class AbstractFragmentParallelPlanner
implements IFragmentParall
throw new IllegalArgumentException(
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
}
- boolean selectRandomDataNode = ReadConsistencyLevel.WEAK ==
this.readConsistencyLevel;
+ boolean selectRandomDataNode = ReadConsistencyLevel.WEAK ==
CONFIG.getReadConsistencyLevel();
// When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
// black list won't be considered because it may have connection issue now.
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 14f3773e9f7..2bd65bb3f98 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -634,7 +634,7 @@
series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash
# These policies are currently supported:
# 1. CUSTOM(Each Database will allocate schema_region_group_per_database
RegionGroups as soon as created)
# 2. AUTO(Each Database will automatically extend SchemaRegionGroups based on
the data it has)
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: String
schema_region_group_extension_policy=AUTO
@@ -642,7 +642,7 @@ schema_region_group_extension_policy=AUTO
# this parameter is the default number of SchemaRegionGroups for each Database.
# When set schema_region_group_extension_policy=AUTO,
# this parameter is the default minimal number of SchemaRegionGroups for each
Database.
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: Integer
default_schema_region_group_num_per_database=1
@@ -650,7 +650,7 @@ default_schema_region_group_num_per_database=1
# This parameter is the maximum number of SchemaRegions expected to be managed
by each DataNode.
# Notice: Since each Database requires at least one SchemaRegionGroup to
manage its schema,
# this parameter doesn't limit the upper bound of cluster SchemaRegions when
there are too many Databases.
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: Integer
schema_region_per_data_node=1
@@ -658,7 +658,7 @@ schema_region_per_data_node=1
# These policies are currently supported:
# 1. CUSTOM(Each Database will allocate data_region_group_per_database
DataRegionGroups as soon as created)
# 2. AUTO(Each Database will automatically extend DataRegionGroups based on
the data it has)
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: String
data_region_group_extension_policy=AUTO
@@ -666,7 +666,7 @@ data_region_group_extension_policy=AUTO
# this parameter is the default number of DataRegionGroups for each Database.
# When set data_region_group_extension_policy=AUTO,
# this parameter is the default minimal number of DataRegionGroups for each
Database.
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: Integer
default_data_region_group_num_per_database=2
@@ -675,7 +675,7 @@ default_data_region_group_num_per_database=2
# Set to 0 means that each node automatically has the number of CPU cores / 2
regions
# Notice: Since each Database requires at least two DataRegionGroups to manage
its data,
# this parameter doesn't limit the upper bound of cluster DataRegions when
there are too many Databases.
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: Integer
data_region_per_data_node=0
@@ -713,7 +713,7 @@ time_partition_origin=0
time_partition_interval=604800000
# The heartbeat interval in milliseconds, default is 1000ms
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: long
heartbeat_interval_in_ms=1000
@@ -1064,7 +1064,7 @@ text_compressor=LZ4
# These consistency levels are currently supported:
# 1. strong(Default, read from the leader replica)
# 2. weak(Read from a random replica)
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: string
read_consistency_level=strong
@@ -1911,7 +1911,7 @@ into_operation_execution_thread_count=2
continuous_query_submit_thread_count=2
# The minimum value of the continuous query execution time interval
-# effectiveMode: restart
+# effectiveMode: hot_reload
# Datatype: long(duration)
continuous_query_min_every_interval_in_ms=1000
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
index ee777a3cdf5..bef8d685875 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
@@ -40,6 +40,7 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -58,6 +59,19 @@ public class ConfigurationFileUtils {
private static final long maxTimeMillsToAcquireLock =
TimeUnit.SECONDS.toMillis(20);
private static final long waitTimeMillsPerCheck =
TimeUnit.MILLISECONDS.toMillis(100);
private static final Logger logger =
LoggerFactory.getLogger(ConfigurationFileUtils.class);
+ private static final Set<String> PARAMETERS_NEED_KEEP_CONSISTENT_IN_CLUSTER =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ "heartbeat_interval_in_ms",
+ "continuous_query_min_every_interval_in_ms",
+ "schema_region_group_extension_policy",
+ "data_region_group_extension_policy",
+ "default_schema_region_group_num_per_database",
+ "default_data_region_group_num_per_database",
+ "schema_region_per_data_node",
+ "data_region_per_data_node",
+ "read_consistency_level")));
private static final String lineSeparator = "\n";
private static final String license =
new StringJoiner(lineSeparator)
@@ -267,7 +281,7 @@ public class ConfigurationFileUtils {
}
public static boolean parameterNeedKeepConsistentInCluster(String key) {
- return false;
+ return key != null &&
PARAMETERS_NEED_KEEP_CONSISTENT_IN_CLUSTER.contains(key.trim());
}
public static void releaseDefault() {