This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08c55b96840 Exclude DataNodes being removed from new Region allocation
(#17934)
08c55b96840 is described below
commit 08c55b9684038f95a223c42c3486be17d6e08faf
Author: Yongzao <[email protected]>
AuthorDate: Mon Jun 15 17:39:21 2026 +0800
Exclude DataNodes being removed from new Region allocation (#17934)
---
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 10 +
.../IoTDBRemoveDataNodeRegionAllocationIT.java | 348 +++++++++++++++++++++
.../iotdb/confignode/manager/ProcedureManager.java | 23 ++
.../manager/load/balancer/RegionBalancer.java | 24 +-
4 files changed, 402 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8b32deb3ea8..0aed2cfdf19 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -160,6 +160,16 @@ public interface BaseEnv {
DataNodeWrapper dataNodeWrapper, String username, String password,
String sqlDialect)
throws SQLException;
+ // Both the write and the read of the returned connection are pinned to the
given DataNode, so it
+ // is useful when some other DataNode has been shut down and a fan-out read
would otherwise fail.
+ default Connection getConnection(DataNodeWrapper dataNodeWrapper) throws
SQLException {
+ return getConnection(
+ dataNodeWrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ TREE_SQL_DIALECT);
+ }
+
default Connection getConnection(String username, String password) throws
SQLException {
return getConnection(username, password, TREE_SQL_DIALECT);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java
new file mode 100644
index 00000000000..c8d55e1c8ba
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.removedatanode;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.exception.InconsistentDataException;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllRegionMap;
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.awaitUntilSuccess;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.generateRemoveString;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.stopDataNodes;
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
+
+/**
+ * Regression test for the bug where, after a {@code remove datanode} has been
submitted, the
+ * ConfigNode still allocated brand-new Region replicas onto the DataNode that
was being removed
+ * (typically a node that had been {@code kill -9}'d and was therefore
reported as {@code Unknown}).
+ * The stranded replica could never be created on the dead node, so the
removal hung forever and the
+ * target DataNode never disappeared from {@code show datanodes}.
+ *
+ * <p>The test kills one DataNode, submits the removal, and — while the
removal is still in progress
+ * — forces a fresh Region allocation by creating a new database and writing
to it. It then asserts
+ * that none of the <em>newly allocated</em> Regions were placed on the
DataNode being removed, and
+ * that the removal eventually completes.
+ *
+ * <p>Note: we must compare against a snapshot of the pre-existing Region ids
rather than asserting
+ * "no Region anywhere references the removing DataNode". The removing node
legitimately keeps
+ * hosting its own pre-existing Regions until each one finishes migrating away
(the new replica is
+ * added first and the old one is dropped last), so those Regions still list
the removing node
+ * during the window. Only freshly created Region groups are expected to
exclude it.
+ */
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBRemoveDataNodeRegionAllocationIT {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBRemoveDataNodeRegionAllocationIT.class);
+
+ private static final String SHOW_DATANODES = "show datanodes";
+
+ private static final String DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
+ private static final String DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
+
+ @Before
+ public void setUp() {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionGroupExtensionPolicy(DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY)
+
.setDataRegionGroupExtensionPolicy(DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void newRegionMustNotBeAllocatedOnRemovingDataNodeTest() throws
Exception {
+ final int configNodeNum = 1;
+ final int dataNodeNum = 4;
+ final int dataReplicationFactor = 2;
+ // Schema regions use Ratis, so migrating a schema replica off the killed
node needs a majority
+ // of the *old* peers to stay alive. With schemaReplicationFactor=2,
killing one of the two
+ // replica holders breaks Ratis quorum, the schema-region migration fails,
and the whole
+ // RemoveDataNodesProcedure rolls back without ever finishing. Use 3 (as
the proven
+ // IoTDBRemoveUnknownDataNodeIT#successTest does) so one kill still leaves
a quorum and the
+ // removal can actually complete.
+ final int schemaReplicationFactor = 3;
+ // Place a few DataRegions per DataNode so the node being removed actually
owns regions
+ // that have to be migrated, which keeps the RemoveDataNodesProcedure in
progress long
+ // enough for us to race a new allocation against it.
+ final int dataRegionPerDataNode = 2;
+
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(schemaReplicationFactor)
+ .setDataReplicationFactor(dataReplicationFactor)
+ .setDefaultDataRegionGroupNumPerDatabase(
+ dataRegionPerDataNode * dataNodeNum / dataReplicationFactor);
+ EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
+
+ final int removeDataNodeId;
+ final List<TDataNodeLocation> removeDataNodeLocations = new ArrayList<>();
+
+ try (final Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ final Statement statement =
makeItCloseQuietly(connection.createStatement());
+ final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ // Seed the cluster with data so that DataRegions are spread across all
DataNodes.
+ ConfigNodeTestUtils.insertTreeModelData(statement);
+
+ final Map<Integer, Set<Integer>> dataRegionMap =
getDataRegionMap(statement);
+ Assert.assertFalse("Expected some DataRegions to exist",
dataRegionMap.isEmpty());
+
+ // Pick a DataNode that currently hosts at least one DataRegion as the
removal target.
+ removeDataNodeId =
+ dataRegionMap.values().stream()
+ .flatMap(Set::stream)
+ .findAny()
+ .orElseThrow(() -> new AssertionError("No DataNode hosts a
DataRegion"));
+ LOGGER.info("Selected DataNode {} to remove.", removeDataNodeId);
+
+ removeDataNodeLocations.addAll(
+
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(location -> location.getDataNodeId() == removeDataNodeId)
+ .collect(Collectors.toList()));
+ Assert.assertEquals(1, removeDataNodeLocations.size());
+
+ // kill -9 the target DataNode so that it becomes Unknown (this is the
exact condition under
+ // which the failure detector overrides the Removing status back to
Unknown).
+ final List<DataNodeWrapper> removeDataNodeWrappers =
+
List.of(EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get());
+ stopDataNodes(removeDataNodeWrappers);
+ LOGGER.info("DataNode {} is stopped.", removeDataNodeId);
+ } catch (InconsistentDataException e) {
+ LOGGER.error("Unexpected error during setup:", e);
+ throw e;
+ }
+
+ // Pick a DataNode that survives the removal; all SQL after the kill must
be pinned to it.
+ // EnvFactory.getConnection() (no args) fans every read out to *all*
DataNodes, including the
+ // killed one, which would fail with "Connection Error" and surface as
InconsistentDataException
+ // rather than the behaviour we want to test.
+ final int killedDataNodePort =
+
EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get().getPort();
+ final DataNodeWrapper survivingDataNode =
+ EnvFactory.getEnv().getDataNodeWrapperList().stream()
+ .filter(wrapper -> wrapper.getPort() != killedDataNodePort)
+ .findAny()
+ .orElseThrow(() -> new AssertionError("No surviving DataNode to
connect to"));
+
+ // Re-establish a connection (pinned to the surviving DataNode) after the
DataNode was killed.
+ try (final Connection connection =
+
makeItCloseQuietly(EnvFactory.getEnv().getConnection(survivingDataNode));
+ final Statement statement =
makeItCloseQuietly(connection.createStatement());
+ final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ final AtomicReference<SyncConfigNodeIServiceClient> clientRef = new
AtomicReference<>(client);
+
+ // Wait until the killed DataNode is reported Unknown, then submit the
removal.
+ awaitDataNodeStatus(statement, removeDataNodeId, "Unknown");
+
+ // Snapshot the Region ids that already exist; region migration only
moves replicas of
+ // these existing groups (it never mints new Region ids), so any Region
id appearing after
+ // this point belongs to the allocation we are about to force.
+ final Set<Integer> preExistingRegionIds = new
HashSet<>(getAllRegionMap(statement).keySet());
+
+ final String removeDataNodeSQL =
generateRemoveString(Set.of(removeDataNodeId));
+ LOGGER.info("Submitting: {}", removeDataNodeSQL);
+ statement.execute(removeDataNodeSQL);
+ LOGGER.info("Remove DataNode {} submitted.", removeDataNodeId);
+
+ // The removal is asynchronous: the SQL returns once the procedure is
submitted, while the
+ // actual region migration off the (dead) node keeps it in progress.
Confirm it is in progress
+ // before we force a new allocation against it.
+ Assert.assertTrue(
+ "Removal completed before we could force a new allocation; cannot
exercise the bug",
+ isRemovalInProgress(clientRef, removeDataNodeLocations));
+
+ // While the removal is in progress, force a fresh Region allocation by
creating a new
+ // database and writing to it. Before the fix, the allocator could still
choose the removing
+ // (Unknown) DataNode as a replica holder for these new regions.
+ try (final Connection probeConnection =
+
makeItCloseQuietly(EnvFactory.getEnv().getConnection(survivingDataNode));
+ final Statement probeStatement =
makeItCloseQuietly(probeConnection.createStatement())) {
+ for (int i = 0; i < 64; i++) {
+ probeStatement.addBatch(
+ String.format(
+ "INSERT INTO root.alloc_probe.d%d(timestamp,speed)
values(%d, %d)", i, i, i));
+ }
+ probeStatement.executeBatch();
+ LOGGER.info("Forced new Region allocation via root.alloc_probe.");
+
+ // The core assertion: none of the newly allocated Regions may land on
the removing
+ // DataNode.
+ assertNewRegionsExcludeDataNode(probeStatement, preExistingRegionIds,
removeDataNodeId);
+ }
+
+ // The removal must still be able to complete; the original bug left it
stuck forever.
+ awaitUntilSuccess(clientRef, removeDataNodeLocations);
+ LOGGER.info("Remove DataNode {} completed.", removeDataNodeId);
+
+ // Final guard: after the node is gone, nothing references it any more.
+ assertNoRegionOnDataNode(statement, removeDataNodeId);
+ assertDataNodeAbsent(statement, removeDataNodeId);
+ } catch (InconsistentDataException e) {
+ LOGGER.error("Unexpected error:", e);
+ throw e;
+ }
+ }
+
+ private static boolean isRemovalInProgress(
+ final AtomicReference<SyncConfigNodeIServiceClient> clientRef,
+ final List<TDataNodeLocation> removeDataNodeLocations) {
+ try {
+ final List<TDataNodeLocation> remaining =
+ clientRef
+ .get()
+ .getDataNodeConfiguration(-1)
+ .getDataNodeConfigurationMap()
+ .values()
+ .stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+ return removeDataNodeLocations.stream().anyMatch(remaining::contains);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to query DataNode configuration", e);
+ return false;
+ }
+ }
+
+ private static void awaitDataNodeStatus(
+ final Statement statement, final int dataNodeId, final String
expectedStatus) {
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try (final ResultSet result =
statement.executeQuery(SHOW_DATANODES)) {
+ while (result.next()) {
+ if (result.getInt(ColumnHeaderConstant.NODE_ID) ==
dataNodeId) {
+ return expectedStatus.equalsIgnoreCase(
+ result.getString(ColumnHeaderConstant.STATUS));
+ }
+ }
+ }
+ return false;
+ });
+ }
+
+ /**
+ * Wait until the forced allocation has produced at least one new Region (a
Region id not present
+ * in {@code preExistingRegionIds}), then assert that none of those new
Regions has a replica on
+ * {@code dataNodeId}.
+ */
+ private static void assertNewRegionsExcludeDataNode(
+ final Statement statement, final Set<Integer> preExistingRegionIds,
final int dataNodeId) {
+ final AtomicReference<Map<Integer, Set<Integer>>> newRegionsRef = new
AtomicReference<>();
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ final Map<Integer, Set<Integer>> newRegions =
+ getAllRegionMap(statement).entrySet().stream()
+ .filter(entry ->
!preExistingRegionIds.contains(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ newRegionsRef.set(newRegions);
+ return !newRegions.isEmpty();
+ });
+
+ final Map<Integer, Set<Integer>> newRegions = newRegionsRef.get();
+ final Set<Integer> offendingRegions =
+ newRegions.entrySet().stream()
+ .filter(entry -> entry.getValue().contains(dataNodeId))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ Assert.assertTrue(
+ String.format(
+ "Newly allocated Region(s) %s were placed on DataNode %d which is
being removed; "
+ + "new Region map: %s",
+ offendingRegions, dataNodeId, newRegions),
+ offendingRegions.isEmpty());
+ }
+
+ private static void assertNoRegionOnDataNode(final Statement statement,
final int dataNodeId)
+ throws Exception {
+ final Map<Integer, Set<Integer>> allRegionMap = getAllRegionMap(statement);
+ final Set<Integer> offendingRegions =
+ allRegionMap.entrySet().stream()
+ .filter(entry -> entry.getValue().contains(dataNodeId))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ Assert.assertTrue(
+ String.format(
+ "Region(s) %s still reference removed DataNode %d; full map: %s",
+ offendingRegions, dataNodeId, allRegionMap),
+ offendingRegions.isEmpty());
+ }
+
+ private static void assertDataNodeAbsent(final Statement statement, final
int dataNodeId)
+ throws Exception {
+ try (final ResultSet result = statement.executeQuery(SHOW_DATANODES)) {
+ while (result.next()) {
+ Assert.assertNotEquals(
+ "DataNode " + dataNodeId + " should have been removed",
+ dataNodeId,
+ result.getInt(ColumnHeaderConstant.NODE_ID));
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 22dd4631781..2beeea6b4af 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -786,6 +786,29 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ /**
+ * Collect the ids of all DataNodes that an in-progress {@link
RemoveDataNodesProcedure} is
+ * removing.
+ *
+ * <p>A DataNode being removed must not receive any newly allocated Region
replica: doing so would
+ * leave a replica stranded on a node that is about to disappear, blocking
the removal from ever
+ * completing. We cannot rely on the node's {@link NodeStatus} here, because
a DataNode that was
+ * killed (e.g. {@code kill -9}) before removal is reported as {@link
NodeStatus#Unknown} by the
+ * failure detector rather than {@link NodeStatus#Removing}, so a status
filter alone would still
+ * let it through. The authoritative source is therefore the running
procedure itself, which holds
+ * the removing DataNode list and survives leader switches via procedure
persistence.
+ *
+ * @return the set of DataNode ids currently being removed (empty if no
removal is in progress)
+ */
+ public Set<Integer> getRemovingDataNodeIds() {
+ return getExecutor().getProcedures().values().stream()
+ .filter(procedure -> procedure instanceof RemoveDataNodesProcedure)
+ .filter(procedure -> !procedure.isFinished())
+ .flatMap(procedure -> ((RemoveDataNodesProcedure)
procedure).getRemovedDataNodes().stream())
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet());
+ }
+
// region region operation related check
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 1075fc1573d..73583151f98 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator;
import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
@@ -41,6 +42,8 @@ import
org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* The {@link RegionBalancer} provides interfaces to generate optimal Region
allocation and
@@ -80,10 +83,21 @@ public class RegionBalancer {
final Map<String, Integer> allotmentMap, final TConsensusGroupType
consensusGroupType)
throws NotEnoughDataNodeException, DatabaseNotExistsException {
- // Some new RegionGroups will have to occupy unknown DataNodes
- // if the number of online DataNodes is insufficient
+ // Some new RegionGroups will have to occupy unknown DataNodes if the
number of online
+ // DataNodes is insufficient (Unknown DataNodes are intentionally kept as
candidates).
+ // However, DataNodes that an in-progress RemoveDataNodesProcedure is
removing must be
+ // excluded: placing a new replica on a node that is about to disappear
would strand that
+ // replica and stall the removal forever. A status filter is not enough
here, because a
+ // DataNode killed (e.g. kill -9) before removal is reported as Unknown
(not Removing) by the
+ // failure detector, so we additionally drop every DataNode that is
currently being removed.
+ final Set<Integer> removingDataNodeIds =
getProcedureManager().getRemovingDataNodeIds();
final List<TDataNodeConfiguration> availableDataNodes =
- getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.Unknown);
+ getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.Unknown)
+ .stream()
+ .filter(
+ dataNode ->
!removingDataNodeIds.contains(dataNode.getLocation().getDataNodeId()))
+ .collect(Collectors.toList());
// Make sure the number of available DataNodes is enough for allocating
new RegionGroups
for (final String database : allotmentMap.keySet()) {
@@ -157,6 +171,10 @@ public class RegionBalancer {
return configManager.getLoadManager();
}
+ private ProcedureManager getProcedureManager() {
+ return configManager.getProcedureManager();
+ }
+
public enum RegionGroupAllocatePolicy {
GREEDY,
GCR,