This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch fix-schema-region-partition-race-remove-message in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14dc895d209d088869bc5e83c134e85784f4031d Author: Yongzao <[email protected]> AuthorDate: Tue Jun 9 15:46:20 2026 +0800 Fix schema region visibility race and remove datanode message --- .../IoTDBRemoveDataNodeNormalIT.java | 32 ++++++++++ .../iotdb/confignode/i18n/ProcedureMessages.java | 4 ++ .../iotdb/confignode/i18n/ProcedureMessages.java | 4 ++ .../manager/partition/PartitionManager.java | 68 +++++++++++++++++++++- .../procedure/env/ConfigNodeProcedureEnv.java | 7 ++- .../procedure/env/RemoveDataNodeHandler.java | 58 +++++++++++++----- .../impl/region/CreateRegionGroupsProcedure.java | 10 +++- 7 files changed, 165 insertions(+), 18 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java index 4ce11149070..304424198b6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java @@ -135,6 +135,38 @@ public class IoTDBRemoveDataNodeNormalIT { // ConsensusFactory.IOT_CONSENSUS_V2); // } + @Test + public void failWhenDataReplicationFactorIsOneUseSQL() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaReplicationFactor(3) + .setDataReplicationFactor(1) + .setDefaultDataRegionGroupNumPerDatabase(1); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + + try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement statement = makeItCloseQuietly(connection.createStatement()); + final ResultSet resultSet = statement.executeQuery(SHOW_DATANODES)) { + final Set<Integer> allDataNodeId = new HashSet<>(); + while (resultSet.next()) { + allDataNodeId.add(resultSet.getInt(ColumnHeaderConstant.NODE_ID)); + } + + final String removeDataNodeSQL = + generateRemoveString(selectRemoveDataNodes(allDataNodeId, 1)); + try { + statement.execute(removeDataNodeSQL); + Assert.fail("Remove DataNode should fail when data_replication_factor is 1"); + } catch (final IoTDBSQLException e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains("data_replication_factor is 1")); + Assert.assertFalse( + e.getMessage(), e.getMessage().contains("Failed to remove all requested data nodes")); + } + } + } + @Test public void fail1C3DTestIoTUseSQL() throws Exception { // Setup 1C3D with schema replication factor = 3, and remove 1D, this test should fail due to diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index 085a803777e..ddce0ddd957 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -445,6 +445,10 @@ public final class ProcedureMessages { "Failed to push topic meta to dataNodes, details: %s"; public static final String FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN = "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication"; + + public static final String + FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE = + "Cannot remove DataNode because data_replication_factor is 1 or at least one DataRegion has only one replica. Removing a DataNode may cause data loss. Increase data_replication_factor and ensure each DataRegion has more than one replica before removing DataNodes."; public static final String FAILED_TO_ROLLBACK_ALTER_PIPE_DETAILS_METADATA_WILL_BE_SYNCHRONIZED = "Failed to rollback alter pipe {}, details: {}, metadata will be synchronized later."; public static final String FAILED_TO_ROLLBACK_COMMIT_SET_TEMPLATE_ON_PATH_DUE_TO = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index d928f4f6bae..9b21d61cc74 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -445,6 +445,10 @@ public final class ProcedureMessages { "Failed to push topic meta to dataNodes, details: %s"; public static final String FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN = "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication"; + + public static final String + FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE = + "不能移除 DataNode,因为 data_replication_factor 为 1,或至少存在一个 DataRegion 只有一个副本。移除 DataNode 可能造成数据丢失。请先提高 data_replication_factor,并确保每个 DataRegion 都有多个副本,再移除 DataNode。"; public static final String FAILED_TO_ROLLBACK_ALTER_PIPE_DETAILS_METADATA_WILL_BE_SYNCHRONIZED = "Failed to rollback alter pipe {}, details: {}, metadata will be synchronized later."; public static final String FAILED_TO_ROLLBACK_COMMIT_SET_TEMPLATE_ON_PATH_DUE_TO = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 5be81256b5c..e4373e773a8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -147,6 +147,9 @@ public class PartitionManager { public static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: "; + private static final long REGION_GROUP_VISIBILITY_TIMEOUT_MS = 10_000L; + private static final long REGION_GROUP_VISIBILITY_CHECK_INTERVAL_MS = 20L; + // Monitor for leadership change private final Object scheduleMonitor = new Object(); @@ -715,12 +718,75 @@ public class PartitionManager { getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType); LOGGER.info(ManagerMessages.CREATEREGIONGROUPS_STARTING_TO_CREATE_THE_FOLLOWING_REGIONGROUPS); createRegionGroupsPlan.planLog(LOGGER); - return getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan); + final TSStatus createStatus = + getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan); + if (createStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return createStatus; + } + return waitForRegionGroupsVisible(createRegionGroupsPlan, consensusGroupType); } else { return RpcUtils.SUCCESS_STATUS; } } + private TSStatus waitForRegionGroupsVisible( + final CreateRegionGroupsPlan createRegionGroupsPlan, + final TConsensusGroupType consensusGroupType) { + final Map<String, Set<TConsensusGroupId>> expectedRegionGroups = new HashMap<>(); + createRegionGroupsPlan + .getRegionGroupMap() + .forEach( + (database, regionReplicaSets) -> { + final Set<TConsensusGroupId> regionGroupIds = + regionReplicaSets.stream() + .map(TRegionReplicaSet::getRegionId) + .filter(regionGroupId -> consensusGroupType.equals(regionGroupId.getType())) + .collect(Collectors.toSet()); + if (!regionGroupIds.isEmpty()) { + expectedRegionGroups.put(database, regionGroupIds); + } + }); + + final long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime <= REGION_GROUP_VISIBILITY_TIMEOUT_MS) { + if (areRegionGroupsVisible(expectedRegionGroups, consensusGroupType)) { + return RpcUtils.SUCCESS_STATUS; + } + try { + TimeUnit.MILLISECONDS.sleep(REGION_GROUP_VISIBILITY_CHECK_INTERVAL_MS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()) + .setMessage( + String.format( + "Interrupted while waiting for created %s RegionGroups %s to become visible in PartitionInfo.", + consensusGroupType, expectedRegionGroups)); + } + } + + final String message = + String.format( + "Created %s RegionGroups %s are not visible in PartitionInfo within %d ms.", + consensusGroupType, expectedRegionGroups, REGION_GROUP_VISIBILITY_TIMEOUT_MS); + LOGGER.warn(message); + return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()).setMessage(message); + } + + private boolean areRegionGroupsVisible( + final Map<String, Set<TConsensusGroupId>> expectedRegionGroups, + final TConsensusGroupType consensusGroupType) { + for (final Map.Entry<String, Set<TConsensusGroupId>> entry : expectedRegionGroups.entrySet()) { + final Set<TConsensusGroupId> visibleRegionGroups = + partitionInfo.getRegionGroupSlotsCounter(entry.getKey(), consensusGroupType).stream() + .map(Pair::getRight) + .collect(Collectors.toSet()); + if (!visibleRegionGroups.containsAll(entry.getValue())) { + return false; + } + } + return true; + } + /** * Only leader use this interface. Checks whether the specified DataPartition has a successor and * returns if it does. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 960d0a7977f..8cd98f3286f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -504,12 +504,15 @@ public class ConfigNodeProcedureEnv { return clientHandler.getResponseList(); } - public void persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) { + public TSStatus persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) { // Persist the allocation result try { - getConsensusManager().write(createRegionGroupsPlan); + return getConsensusManager().write(createRegionGroupsPlan); } catch (ConsensusException e) { LOG.warn("Failed in the write API executing the consensus layer due to: ", e); + return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()) + .setMessage( + "Failed to persist RegionGroup allocation in the consensus layer: " + e.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java index 5b505ec001b..a7235747f71 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java @@ -559,6 +559,14 @@ public class RemoveDataNodeHandler { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); List<TDataNodeLocation> removedDataNodes = removeDataNodePlan.getDataNodeLocations(); + if (hasSingleDataRegionReplica()) { + status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); + status.setMessage( + ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_DATA_REPLICATION_FACTOR_IS_ONE); + LOGGER.error(status.getMessage()); + return status; + } + int availableDatanodeSize = configManager .getNodeManager() @@ -566,20 +574,26 @@ public class RemoveDataNodeHandler { .size(); // when the configuration is one replication, it will be failed if the data node is not in // running state. - if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) { - for (TDataNodeLocation dataNodeLocation : removedDataNodes) { - // check whether removed data node is in running state - if (!NodeStatus.Running.equals( - configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) { - removedDataNodes.remove(dataNodeLocation); - LOGGER.error( - ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN, dataNodeLocation); - } - if (removedDataNodes.isEmpty()) { - status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); - status.setMessage(ProcedureMessages.FAILED_TO_REMOVE_ALL_REQUESTED_DATA_NODES); - return status; - } + if (CONF.getSchemaReplicationFactor() == 1) { + final List<TDataNodeLocation> notRunningDataNodes = + removedDataNodes.stream() + .filter( + dataNodeLocation -> + !NodeStatus.Running.equals( + configManager + .getLoadManager() + .getNodeStatus(dataNodeLocation.getDataNodeId()))) + .collect(Collectors.toList()); + notRunningDataNodes.forEach( + dataNodeLocation -> + LOGGER.error( + ProcedureMessages.FAILED_TO_REMOVE_DATA_NODE_BECAUSE_IT_IS_NOT_IN, + dataNodeLocation)); + removedDataNodes.removeAll(notRunningDataNodes); + if (removedDataNodes.isEmpty()) { + status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); + status.setMessage(ProcedureMessages.FAILED_TO_REMOVE_ALL_REQUESTED_DATA_NODES); + return status; } } @@ -604,6 +618,22 @@ public class RemoveDataNodeHandler { return status; } + private boolean hasSingleDataRegionReplica() { + return CONF.getDataReplicationFactor() == 1 + || configManager + .getClusterSchemaManager() + .getMatchedDatabaseSchemasByName( + configManager.getClusterSchemaManager().getDatabaseNames(null), null) + .values() + .stream() + .anyMatch(databaseSchema -> databaseSchema.getDataReplicationFactor() == 1) + || configManager + .getPartitionManager() + .getAllReplicaSets(TConsensusGroupType.DataRegion) + .stream() + .anyMatch(replicaSet -> replicaSet.getDataNodeLocationsSize() <= 1); + } + /** * Checks whether all DataNodes specified for deletion exist in the cluster. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java index 2cb283d400e..e9cce807e77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java @@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -36,10 +38,12 @@ import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,7 +177,11 @@ public class CreateRegionGroupsProcedure } })); - env.persistRegionGroup(persistPlan); + final TSStatus persistStatus = env.persistRegionGroup(persistPlan); + if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(persistStatus))); + return Flow.NO_MORE_STATE; + } try { env.getConfigManager().getConsensusManager().write(offerPlan); } catch (final ConsensusException e) {
