This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 57a5ec6d534 Fix leader balance blocking region activation (#17755)
57a5ec6d534 is described below
commit 57a5ec6d534a9603cd6107929a2e7c0275516534
Author: Yongzao <[email protected]>
AuthorDate: Mon May 25 23:30:38 2026 +0800
Fix leader balance blocking region activation (#17755)
---
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 2 +-
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 2 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../confignode/conf/ConfigNodeDescriptor.java | 2 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 2 +-
.../manager/load/balancer/RouteBalancer.java | 631 ++++++++++++++-------
.../router/leader/AbstractLeaderBalancer.java | 2 +-
.../impl/region/CreateRegionGroupsProcedure.java | 26 +-
.../procedure/state/CreateRegionGroupsState.java | 4 +
9 files changed, 458 insertions(+), 215 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 3a662932b61..c1c3e877de7 100644
---
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -466,7 +466,7 @@ public final class ConfigNodeMessages {
public static final String UNKNOWN_HOST_WHEN_CHECKING_SEED_CONFIGNODE_IP =
"Unknown host when checking seed configNode IP {}";
public static final String UNKNOWN_LEADER_DISTRIBUTION_POLICY =
- "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or
\"CFD\" or \"HASH\"";
+ "Unknown leader_distribution_policy: %s, please set to \"GREEDY\" or
\"CFS\" or \"HASH\"";
public static final String UNKNOWN_PHYSICALPLAN_CONFIGPHYSICALPLANTYPE =
"unknown PhysicalPlan configPhysicalPlanType: ";
public static final String UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO =
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index b08332bb35c..6ca847626c8 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -462,7 +462,7 @@ public final class ConfigNodeMessages {
public static final String UNKNOWN_HOST_WHEN_CHECKING_SEED_CONFIGNODE_IP =
"Unknown host when checking seed configNode IP {}";
public static final String UNKNOWN_LEADER_DISTRIBUTION_POLICY =
- "未知 leader_distribution_policy:%s,请设置为 \"GREEDY\"、\"CFD\" 或 \"HASH\"";
+ "未知 leader_distribution_policy:%s,请设置为 \"GREEDY\"、\"CFS\" 或 \"HASH\"";
public static final String UNKNOWN_PHYSICALPLAN_CONFIGPHYSICALPLANTYPE =
"unknown PhysicalPlan configPhysicalPlanType: ";
public static final String UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index aacbe20203f..1c0555affe3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -226,7 +226,7 @@ public class ConfigNodeConfig {
private double topologyProbingTimeoutRatio = 0.5;
/** The policy of cluster RegionGroups' leader distribution. */
- private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
+ private String leaderDistributionPolicy = AbstractLeaderBalancer.CFS_POLICY;
/** Whether to enable auto leader balance for Ratis consensus protocol. */
private boolean enableAutoLeaderBalanceForRatisConsensus = true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d6dc5eb4351..b6bf74edb31 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -369,7 +369,7 @@ public class ConfigNodeDescriptor {
String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
- || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)
+ || AbstractLeaderBalancer.CFS_POLICY.equals(leaderDistributionPolicy)
||
AbstractLeaderBalancer.HASH_POLICY.equals(leaderDistributionPolicy)) {
conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index d3f6f15f1be..111f32bcdd3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -161,7 +161,7 @@ public class ConfigNodeStartupCheck extends StartupChecks {
// The leader distribution policy is limited
if
(!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
- &&
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())
+ &&
!AbstractLeaderBalancer.CFS_POLICY.equals(CONF.getLeaderDistributionPolicy())
&&
!AbstractLeaderBalancer.HASH_POLICY.equals(CONF.getLeaderDistributionPolicy()))
{
throw new ConfigurationException(
ConfigNodeMessages.LEADER_DISTRIBUTION_POLICY,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e3954a59385..5429727f77b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -60,18 +60,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/** The RouteBalancer guides the cluster RegionGroups' leader distribution and
routing priority. */
@@ -104,10 +106,12 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
ProcedureManager.PROCEDURE_WAIT_TIME_OUT -
TimeUnit.SECONDS.toMillis(2),
TimeUnit.SECONDS.toMillis(10));
private static final long WAIT_PRIORITY_INTERVAL = 10;
+ private static final long RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS =
TimeUnit.SECONDS.toMillis(10);
private final IManager configManager;
- // For generating optimal Region leader distribution
- private final AbstractLeaderBalancer leaderBalancer;
+ // For serializing and generating optimal Region leader distribution by
RegionGroup type
+ private final LeaderBalanceContext schemaRegionLeaderBalanceContext;
+ private final LeaderBalanceContext dataRegionLeaderBalanceContext;
// For generating optimal cluster Region routing priority
private final IPriorityBalancer priorityRouter;
@@ -121,29 +125,24 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 *
1000L * 1000L * 1000L;
private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
- private final Map<Integer, List<String>> lastBalancedOldLeaderId2RegionMap;
- private Map<TConsensusGroupId, Integer> lastDataRegion2OldLeaderMap;
- private Set<TConsensusGroupId> lastBalancedDataRegionSet;
-
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
this.priorityMapLock = new ReentrantReadWriteLock();
this.regionPriorityMap = new TreeMap<>();
- this.lastFailedTimeForLeaderBalance = new TreeMap<>();
- this.lastBalancedOldLeaderId2RegionMap = new ConcurrentHashMap<>();
-
- switch (CONF.getLeaderDistributionPolicy()) {
- case AbstractLeaderBalancer.GREEDY_POLICY:
- this.leaderBalancer = new GreedyLeaderBalancer();
- break;
- case AbstractLeaderBalancer.HASH_POLICY:
- this.leaderBalancer = new HashLeaderBalancer();
- break;
- case AbstractLeaderBalancer.CFD_POLICY:
- default:
- this.leaderBalancer = new CostFlowSelectionLeaderBalancer();
- break;
- }
+ this.lastFailedTimeForLeaderBalance = new ConcurrentHashMap<>();
+
+ this.schemaRegionLeaderBalanceContext =
+ new LeaderBalanceContext(
+ TConsensusGroupType.SchemaRegion,
+ new ReentrantLock(),
+ createLeaderBalancer(),
+ SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ this.dataRegionLeaderBalanceContext =
+ new LeaderBalanceContext(
+ TConsensusGroupType.DataRegion,
+ new ReentrantLock(),
+ createLeaderBalancer(),
+ DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
switch (CONF.getRoutePriorityPolicy()) {
case IPriorityBalancer.GREEDY_POLICY:
@@ -156,185 +155,404 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
}
- /** Balance cluster RegionGroup leader distribution through configured
algorithm. */
- private synchronized void balanceRegionLeader() {
- if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
- balanceRegionLeader(TConsensusGroupType.SchemaRegion,
SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ /** Result of DataRegion leader balance used by the required post-balance
actions. */
+ private static class DataRegionLeaderBalanceResult {
+
+ private final Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap;
+ private final Set<TConsensusGroupId> balancedDataRegionSet;
+ private final Map<Integer, List<String>> balancedOldLeaderId2RegionMap;
+
+ private DataRegionLeaderBalanceResult(
+ Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap,
+ Set<TConsensusGroupId> balancedDataRegionSet,
+ Map<Integer, List<String>> balancedOldLeaderId2RegionMap) {
+ this.dataRegion2OldLeaderMap = dataRegion2OldLeaderMap;
+ this.balancedDataRegionSet = balancedDataRegionSet;
+ this.balancedOldLeaderId2RegionMap = balancedOldLeaderId2RegionMap;
+ }
+
+ private static DataRegionLeaderBalanceResult empty() {
+ return new DataRegionLeaderBalanceResult(
+ Collections.emptyMap(), Collections.emptySet(),
Collections.emptyMap());
+ }
+
+ private boolean hasBalancedDataRegion() {
+ return !balancedDataRegionSet.isEmpty();
+ }
+ }
+
+ /** Immutable state for one RegionGroup type's leader balance round. */
+ private static class LeaderBalanceContext {
+
+ private final TConsensusGroupType regionGroupType;
+ private final ReentrantLock leaderBalanceLock;
+ private final AbstractLeaderBalancer leaderBalancer;
+ private final String consensusProtocolClass;
+
+ private LeaderBalanceContext(
+ TConsensusGroupType regionGroupType,
+ ReentrantLock leaderBalanceLock,
+ AbstractLeaderBalancer leaderBalancer,
+ String consensusProtocolClass) {
+ this.regionGroupType = regionGroupType;
+ this.leaderBalanceLock = leaderBalanceLock;
+ this.leaderBalancer = leaderBalancer;
+ this.consensusProtocolClass = consensusProtocolClass;
+ }
+ }
+
+ /** Mutable accumulator for one leader balance round's transfer requests and
cache updates. */
+ private static class LeaderTransferContext {
+
+ private final long currentTime;
+ private int requestId;
+ private final DataNodeAsyncRequestContext<TRegionLeaderChangeReq,
TRegionLeaderChangeResp>
+ clientHandler;
+ private final Map<TConsensusGroupId, ConsensusGroupHeartbeatSample>
successTransferMap;
+ private final Map<Integer, List<String>> balancedOldLeaderId2RegionMap;
+
+ private LeaderTransferContext(long currentTime) {
+ this.currentTime = currentTime;
+ this.requestId = 0;
+ this.clientHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CHANGE_REGION_LEADER);
+ this.successTransferMap = new TreeMap<>();
+ this.balancedOldLeaderId2RegionMap = new HashMap<>();
+ }
+
+ private void putSuccessTransfer(TConsensusGroupId regionGroupId, long
timestamp, int leaderId) {
+ successTransferMap.put(regionGroupId, new
ConsensusGroupHeartbeatSample(timestamp, leaderId));
+ }
+
+ private void putRatisTransferRequest(
+ TConsensusGroupId regionGroupId, TDataNodeLocation newLeader) {
+ TRegionLeaderChangeReq regionLeaderChangeReq =
+ new TRegionLeaderChangeReq(regionGroupId, newLeader);
+ clientHandler.putRequest(requestId, regionLeaderChangeReq);
+ clientHandler.putNodeLocation(requestId, newLeader);
+ requestId++;
+ }
+
+ private boolean hasRatisTransferRequest() {
+ return requestId > 0;
+ }
+ }
+
+ /** Create a leader balancer instance according to the configured leader
distribution policy. */
+ private AbstractLeaderBalancer createLeaderBalancer() {
+ switch (CONF.getLeaderDistributionPolicy()) {
+ case AbstractLeaderBalancer.GREEDY_POLICY:
+ return new GreedyLeaderBalancer();
+ case AbstractLeaderBalancer.HASH_POLICY:
+ return new HashLeaderBalancer();
+ case AbstractLeaderBalancer.CFS_POLICY:
+ default:
+ return new CostFlowSelectionLeaderBalancer();
+ }
+ }
+
+ /**
+ * Balance leaders for all enabled RegionGroup types.
+ *
+ * <p>If both SchemaRegion and DataRegion leader balance are enabled, the
two types are balanced
+ * concurrently and serialized only by their own type-specific locks.
+ */
+ private DataRegionLeaderBalanceResult balanceAllEnabledRegionLeaders() {
+ return balanceSelectedRegionLeaders(
+ IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION,
+ IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION);
+ }
+
+ /**
+ * Balance leaders only for the RegionGroup types included in the given
change event.
+ *
+ * <p>This avoids letting an unrelated SchemaRegion change delay DataRegion
leader selection, and
+ * vice versa.
+ */
+ private DataRegionLeaderBalanceResult balanceRegionLeadersForChangedGroups(
+ Set<TConsensusGroupId> regionGroupIds) {
+ final boolean shouldBalanceSchemaRegion =
+ IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION
+ && containsRegionType(regionGroupIds,
TConsensusGroupType.SchemaRegion);
+ final boolean shouldBalanceDataRegion =
+ IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
+ && containsRegionType(regionGroupIds,
TConsensusGroupType.DataRegion);
+ return balanceSelectedRegionLeaders(shouldBalanceSchemaRegion,
shouldBalanceDataRegion);
+ }
+
+ /** Return whether the given RegionGroup set contains a RegionGroup of the
specified type. */
+ private boolean containsRegionType(
+ Set<TConsensusGroupId> regionGroupIds, TConsensusGroupType
regionGroupType) {
+ return regionGroupIds.stream()
+ .anyMatch(regionGroupId ->
regionGroupType.equals(regionGroupId.getType()));
+ }
+
+ /**
+ * Balance the selected RegionGroup types.
+ *
+ * <p>When both types are selected, DataRegion balance runs in the current
thread while
+ * SchemaRegion balance runs asynchronously. The method still waits for both
rounds to finish
+ * before returning.
+ */
+ private DataRegionLeaderBalanceResult balanceSelectedRegionLeaders(
+ boolean shouldBalanceSchemaRegion, boolean shouldBalanceDataRegion) {
+ if (shouldBalanceSchemaRegion && shouldBalanceDataRegion) {
+ return balanceSchemaAndDataRegionLeaders();
+ }
+ if (shouldBalanceSchemaRegion) {
+ balanceRegionLeaders(schemaRegionLeaderBalanceContext);
+ }
+ return shouldBalanceDataRegion
+ ? balanceRegionLeaders(dataRegionLeaderBalanceContext)
+ : DataRegionLeaderBalanceResult.empty();
+ }
+
+ /**
+ * Balance SchemaRegion and DataRegion leaders in parallel while preserving
serialization inside
+ * each RegionGroup type.
+ */
+ private DataRegionLeaderBalanceResult balanceSchemaAndDataRegionLeaders() {
+ CompletableFuture<Void> schemaRegionLeaderBalanceFuture =
+ CompletableFuture.runAsync(() ->
balanceRegionLeaders(schemaRegionLeaderBalanceContext));
+ try {
+ return balanceRegionLeaders(dataRegionLeaderBalanceContext);
+ } finally {
+ schemaRegionLeaderBalanceFuture.join();
}
- if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
- balanceRegionLeader(TConsensusGroupType.DataRegion,
DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ }
+
+ /** Balance leaders of one RegionGroup type under its dedicated
serialization lock. */
+ private DataRegionLeaderBalanceResult balanceRegionLeaders(
+ LeaderBalanceContext leaderBalanceContext) {
+ leaderBalanceContext.leaderBalanceLock.lock();
+ try {
+ return balanceRegionLeadersInLock(leaderBalanceContext);
+ } finally {
+ leaderBalanceContext.leaderBalanceLock.unlock();
}
}
- private void balanceRegionLeader(
- TConsensusGroupType regionGroupType, String consensusProtocolClass) {
+ /**
+ * Generate the optimal leader distribution of one RegionGroup type and
apply the required leader
+ * transfers.
+ */
+ private DataRegionLeaderBalanceResult balanceRegionLeadersInLock(
+ LeaderBalanceContext leaderBalanceContext) {
// Collect the latest data and generate the optimal leader distribution
Map<TConsensusGroupId, Integer> currentLeaderMap =
- getLoadManager().getLoadCache().getRegionLeaderMap(regionGroupType);
+
getLoadManager().getLoadCache().getRegionLeaderMap(leaderBalanceContext.regionGroupType);
Map<TConsensusGroupId, Integer> optimalLeaderMap =
- leaderBalancer.generateOptimalLeaderDistribution(
-
getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType),
-
getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType),
+ leaderBalanceContext.leaderBalancer.generateOptimalLeaderDistribution(
+ getLoadManager()
+ .getLoadCache()
+
.getCurrentDatabaseRegionGroupMap(leaderBalanceContext.regionGroupType),
+ getLoadManager()
+ .getLoadCache()
+
.getCurrentRegionLocationMap(leaderBalanceContext.regionGroupType),
currentLeaderMap,
getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
-
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
-
- // Transfer leader to the optimal distribution
- long currentTime = System.nanoTime();
- AtomicInteger requestId = new AtomicInteger(0);
- DataNodeAsyncRequestContext<TRegionLeaderChangeReq,
TRegionLeaderChangeResp> clientHandler =
- new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CHANGE_REGION_LEADER);
- Map<TConsensusGroupId, ConsensusGroupHeartbeatSample> successTransferMap =
new TreeMap<>();
- optimalLeaderMap.forEach(
- (regionGroupId, newLeaderId) -> {
- if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
- && currentTime -
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
- <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS) {
- return;
- }
+ getLoadManager()
+ .getLoadCache()
+
.getCurrentRegionStatisticsMap(leaderBalanceContext.regionGroupType));
- int oldLeaderId = currentLeaderMap.get(regionGroupId);
- if (newLeaderId != -1 && !newLeaderId.equals(oldLeaderId)) {
- LOGGER.info(
-
ManagerMessages.LEADERBALANCER_TRY_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+ LeaderTransferContext leaderTransferContext = new
LeaderTransferContext(System.nanoTime());
+ optimalLeaderMap.forEach(
+ (regionGroupId, newLeaderId) ->
+ collectLeaderTransfer(
+ leaderBalanceContext.regionGroupType,
+ leaderBalanceContext.consensusProtocolClass,
+ currentLeaderMap,
+ leaderTransferContext,
regionGroupId,
- newLeaderId);
- switch (consensusProtocolClass) {
- case ConsensusFactory.IOT_CONSENSUS:
- case ConsensusFactory.SIMPLE_CONSENSUS:
- // For IoTConsensus or SimpleConsensus protocol, change
- // RegionRouteMap is enough
- successTransferMap.put(
- regionGroupId, new
ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
- break;
- case ConsensusFactory.IOT_CONSENSUS_V2:
- // For IoTConsensusV2 protocol, change RegionRouteMap and
execute flush on old
- // region leader
- successTransferMap.put(
- regionGroupId, new
ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
- // Prepare data for flushOldLeader
- if (oldLeaderId != -1) {
- lastBalancedOldLeaderId2RegionMap.compute(
- oldLeaderId,
- (k, v) -> {
- if (v == null) {
- List<String> value = new ArrayList<>();
- value.add(String.valueOf(regionGroupId.getId()));
- return value;
- }
- v.add(String.valueOf(regionGroupId.getId()));
- return v;
- });
- }
- break;
- case ConsensusFactory.RATIS_CONSENSUS:
- default:
- // For ratis protocol, the ConfigNode-leader will send a
changeLeaderRequest to the
- // new
- // leader.
- // And the RegionRouteMap will be updated by
Cluster-Heartbeat-Service later if
- // change
- // leader success.
- // Force update region leader for ratis consensus when
replication factor is 1.
- if (TConsensusGroupType.SchemaRegion.equals(regionGroupType)
- && CONF.getSchemaReplicationFactor() == 1) {
- successTransferMap.put(
- regionGroupId, new ConsensusGroupHeartbeatSample(0,
newLeaderId));
- } else if
(TConsensusGroupType.DataRegion.equals(regionGroupType)
- && CONF.getDataReplicationFactor() == 1) {
- successTransferMap.put(
- regionGroupId, new ConsensusGroupHeartbeatSample(0,
newLeaderId));
- } else {
- TDataNodeLocation newLeader =
-
getNodeManager().getRegisteredDataNode(newLeaderId).getLocation();
- TRegionLeaderChangeReq regionLeaderChangeReq =
- new TRegionLeaderChangeReq(regionGroupId, newLeader);
- int requestIndex = requestId.getAndIncrement();
- clientHandler.putRequest(requestIndex,
regionLeaderChangeReq);
- clientHandler.putNodeLocation(requestIndex, newLeader);
- }
- break;
- }
- }
- });
- if (requestId.get() > 0) {
- // Don't retry ChangeLeader request
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(clientHandler);
- for (int i = 0; i < requestId.get(); i++) {
- if (clientHandler.getResponseMap().get(i).getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successTransferMap.put(
- clientHandler.getRequest(i).getRegionId(),
- new ConsensusGroupHeartbeatSample(
-
clientHandler.getResponseMap().get(i).getConsensusLogicalTimestamp(),
-
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()));
- } else {
- lastFailedTimeForLeaderBalance.put(
- clientHandler.getRequest(i).getRegionId(), currentTime);
- LOGGER.error(
-
ManagerMessages.LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
- clientHandler.getRequest(i).getRegionId(),
- clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
- }
+ newLeaderId));
+
+ sendRatisLeaderTransferRequests(leaderTransferContext);
+
getLoadManager().forceUpdateConsensusGroupCache(leaderTransferContext.successTransferMap);
+
+ return
TConsensusGroupType.DataRegion.equals(leaderBalanceContext.regionGroupType)
+ ? new DataRegionLeaderBalanceResult(
+ new HashMap<>(currentLeaderMap),
+ new HashSet<>(leaderTransferContext.successTransferMap.keySet()),
+ leaderTransferContext.balancedOldLeaderId2RegionMap)
+ : DataRegionLeaderBalanceResult.empty();
+ }
+
+ /** Collect one leader transfer into either an immediate cache update or a
Ratis transfer RPC. */
+ private void collectLeaderTransfer(
+ TConsensusGroupType regionGroupType,
+ String consensusProtocolClass,
+ Map<TConsensusGroupId, Integer> currentLeaderMap,
+ LeaderTransferContext leaderTransferContext,
+ TConsensusGroupId regionGroupId,
+ Integer newLeaderId) {
+ if (shouldSkipRatisLeaderTransferAfterFailure(
+ consensusProtocolClass, regionGroupId,
leaderTransferContext.currentTime)) {
+ return;
+ }
+
+ int oldLeaderId = currentLeaderMap.get(regionGroupId);
+ if (newLeaderId == -1 || newLeaderId.equals(oldLeaderId)) {
+ return;
+ }
+
+ LOGGER.info(
+
ManagerMessages.LEADERBALANCER_TRY_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+ regionGroupId,
+ newLeaderId);
+ switch (consensusProtocolClass) {
+ case ConsensusFactory.IOT_CONSENSUS:
+ case ConsensusFactory.SIMPLE_CONSENSUS:
+ // For IoTConsensus or SimpleConsensus protocol, changing
RegionRouteMap is enough.
+ leaderTransferContext.putSuccessTransfer(
+ regionGroupId, leaderTransferContext.currentTime, newLeaderId);
+ break;
+ case ConsensusFactory.IOT_CONSENSUS_V2:
+ leaderTransferContext.putSuccessTransfer(
+ regionGroupId, leaderTransferContext.currentTime, newLeaderId);
+ recordOldLeaderForFlush(
+ leaderTransferContext.balancedOldLeaderId2RegionMap,
regionGroupId, oldLeaderId);
+ break;
+ case ConsensusFactory.RATIS_CONSENSUS:
+ default:
+ collectRatisLeaderTransfer(
+ regionGroupType, leaderTransferContext, regionGroupId,
newLeaderId);
+ break;
+ }
+ }
+
+ /** Return whether a Ratis leader transfer should be skipped because it
recently failed. */
+ private boolean shouldSkipRatisLeaderTransferAfterFailure(
+ String consensusProtocolClass, TConsensusGroupId regionGroupId, long
currentTime) {
+ return ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
+ && currentTime -
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
+ <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS;
+ }
+
+ /** Prepare a Ratis leader transfer, or force-update cache directly for
single-replica groups. */
+ private void collectRatisLeaderTransfer(
+ TConsensusGroupType regionGroupType,
+ LeaderTransferContext leaderTransferContext,
+ TConsensusGroupId regionGroupId,
+ int newLeaderId) {
+ // The RegionRouteMap will be updated later by heartbeat if the transfer
succeeds.
+ if (isSingleReplicaRegionGroup(regionGroupType)) {
+ leaderTransferContext.putSuccessTransfer(regionGroupId, 0, newLeaderId);
+ return;
+ }
+
+ TDataNodeLocation newLeader =
getNodeManager().getRegisteredDataNode(newLeaderId).getLocation();
+ leaderTransferContext.putRatisTransferRequest(regionGroupId, newLeader);
+ }
+
+ /** Return whether the RegionGroup type has only one replica and therefore
no transfer RPC. */
+ private boolean isSingleReplicaRegionGroup(TConsensusGroupType
regionGroupType) {
+ return (TConsensusGroupType.SchemaRegion.equals(regionGroupType)
+ && CONF.getSchemaReplicationFactor() == 1)
+ || (TConsensusGroupType.DataRegion.equals(regionGroupType)
+ && CONF.getDataReplicationFactor() == 1);
+ }
+
+ /**
+ * Record the old DataRegion leader so IoTConsensusV2 can flush it after a
successful transfer.
+ */
+ private void recordOldLeaderForFlush(
+ Map<Integer, List<String>> balancedOldLeaderId2RegionMap,
+ TConsensusGroupId regionGroupId,
+ int oldLeaderId) {
+ if (oldLeaderId != -1) {
+ balancedOldLeaderId2RegionMap
+ .computeIfAbsent(oldLeaderId, ignored -> new ArrayList<>())
+ .add(String.valueOf(regionGroupId.getId()));
+ }
+ }
+
+ /** Send collected Ratis leader transfer RPCs and record successful
transfers in cache samples. */
+ private void sendRatisLeaderTransferRequests(LeaderTransferContext
leaderTransferContext) {
+ if (!leaderTransferContext.hasRatisTransferRequest()) {
+ return;
+ }
+
+ // Don't retry ChangeLeader request.
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(
+ leaderTransferContext.clientHandler, 1,
RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS);
+ for (int i = 0; i < leaderTransferContext.requestId; i++) {
+ TRegionLeaderChangeReq request =
leaderTransferContext.clientHandler.getRequest(i);
+ TRegionLeaderChangeResp response =
+ leaderTransferContext.clientHandler.getResponseMap().get(i);
+ if (response != null
+ && response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ leaderTransferContext.putSuccessTransfer(
+ request.getRegionId(),
+ response.getConsensusLogicalTimestamp(),
+ request.getNewLeaderNode().getDataNodeId());
+ } else {
+ lastFailedTimeForLeaderBalance.put(request.getRegionId(),
System.nanoTime());
+ LOGGER.error(
+
ManagerMessages.LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
+ request.getRegionId(),
+ request.getNewLeaderNode().getDataNodeId());
+ }
+ }
+ }
+
+ /** Invalidate schema cache on old DataRegion leaders after successful
leader transfers. */
+ private void invalidateSchemaCacheOfOldLeaders(
+ DataRegionLeaderBalanceResult leaderBalanceResult) {
+ if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
+ || !leaderBalanceResult.hasBalancedDataRegion()) {
+ return;
+ }
+
+ final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
+ int requestIndex = 0;
+ for (Map.Entry<TConsensusGroupId, Integer> entry :
+ leaderBalanceResult.dataRegion2OldLeaderMap.entrySet()) {
+ if (!leaderBalanceResult.balancedDataRegionSet.contains(entry.getKey()))
{
+ continue;
}
+ requestIndex =
+ addInvalidateSchemaCacheRequest(
+ invalidateSchemaCacheRequestHandler, requestIndex,
entry.getKey(), entry.getValue());
+ }
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
+ }
+
+ /** Add one invalidate-schema-cache request for the old leader of a balanced
DataRegion. */
+ private int addInvalidateSchemaCacheRequest(
+ DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler,
+ int requestIndex,
+ TConsensusGroupId consensusGroupId,
+ Integer dataNodeId) {
+ if (dataNodeId == null || dataNodeId == -1) {
+ return requestIndex;
}
- getLoadManager().forceUpdateConsensusGroupCache(successTransferMap);
-
- // Prepare data for invalidSchemaCacheOfOldLeaders
- if (regionGroupType.equals(TConsensusGroupType.DataRegion)) {
- lastBalancedDataRegionSet = successTransferMap.keySet();
- lastDataRegion2OldLeaderMap = currentLeaderMap;
- }
- }
-
- private void invalidateSchemaCacheOfOldLeaders() {
- BiConsumer<Map<TConsensusGroupId, Integer>, Set<TConsensusGroupId>>
consumer =
- (oldLeaderMap, successTransferSet) -> {
- final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
- new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
- final AtomicInteger requestIndex = new AtomicInteger(0);
- oldLeaderMap.entrySet().stream()
- .filter(entry -> successTransferSet.contains(entry.getKey()))
- .forEach(
- entry -> {
- // set target
- final Integer dataNodeId = entry.getValue();
- if (dataNodeId == -1) {
- return;
- }
- final TDataNodeLocation dataNodeLocation =
-
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
- if (dataNodeLocation == null) {
-
LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID, dataNodeId);
- return;
- }
- invalidateSchemaCacheRequestHandler.putNodeLocation(
- requestIndex.get(), dataNodeLocation);
- // set req
- final TConsensusGroupId consensusGroupId = entry.getKey();
- final String database =
-
getPartitionManager().getRegionDatabase(consensusGroupId);
-
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
- requestIndex.incrementAndGet();
- });
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
- };
-
- if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
- consumer.accept(lastDataRegion2OldLeaderMap, lastBalancedDataRegionSet);
- }
- }
-
- private void flushOldLeaderIfIoTV2() {
+ TDataNodeConfiguration dataNodeConfiguration =
+ getNodeManager().getRegisteredDataNode(dataNodeId);
+ if (dataNodeConfiguration == null || dataNodeConfiguration.getLocation()
== null) {
+ LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID,
dataNodeId);
+ return requestIndex;
+ }
+
+ invalidateSchemaCacheRequestHandler.putNodeLocation(
+ requestIndex, dataNodeConfiguration.getLocation());
+ invalidateSchemaCacheRequestHandler.putRequest(
+ requestIndex,
getPartitionManager().getRegionDatabase(consensusGroupId));
+ return requestIndex + 1;
+ }
+
+ /** Flush old DataRegion leaders after IoTConsensusV2 leader transfers. */
+ private void flushOldLeaderIfIoTV2(DataRegionLeaderBalanceResult
leaderBalanceResult) {
if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
- || !Objects.equals(
- DATA_REGION_CONSENSUS_PROTOCOL_CLASS,
ConsensusFactory.IOT_CONSENSUS_V2)) {
+ || !Objects.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS,
ConsensusFactory.IOT_CONSENSUS_V2)
+ || leaderBalanceResult.balancedOldLeaderId2RegionMap.isEmpty()) {
return;
}
- BiConsumer<Integer, List<String>> consumer =
+ leaderBalanceResult.balancedOldLeaderId2RegionMap.forEach(
(oldLeaderId, regionGroupIds) -> {
TDataNodeConfiguration configuration =
getNodeManager().getRegisteredDataNode(oldLeaderId);
@@ -358,21 +576,20 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
oldLeaderId,
regionGroupIds);
}
- };
- lastBalancedOldLeaderId2RegionMap.forEach(consumer);
- // after flush, clear map for next balance
- lastBalancedOldLeaderId2RegionMap.clear();
+ });
}
- private synchronized void handleBalanceAction() {
- invalidateSchemaCacheOfOldLeaders();
- flushOldLeaderIfIoTV2();
+ /** Execute follow-up actions required after DataRegion leader balance. */
+ private void handleBalanceAction(DataRegionLeaderBalanceResult
leaderBalanceResult) {
+ invalidateSchemaCacheOfOldLeaders(leaderBalanceResult);
+ flushOldLeaderIfIoTV2(leaderBalanceResult);
}
- public synchronized void balanceRegionLeaderAndPriority() {
- balanceRegionLeader();
+ /** Balance leaders and routing priority immediately, then run DataRegion
follow-up actions. */
+ public void balanceRegionLeaderAndPriority() {
+ DataRegionLeaderBalanceResult leaderBalanceResult =
balanceAllEnabledRegionLeaders();
balanceRegionPriority();
- handleBalanceAction();
+ handleBalanceAction(leaderBalanceResult);
}
/** Balance cluster RegionGroup route priority through configured algorithm.
*/
@@ -415,6 +632,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
}
+ /** Broadcast the latest Region route priority map to all available
DataNodes. */
private void broadcastLatestRegionPriorityMap() {
// Broadcast the RegionRouteMap to all DataNodes except the unknown ones
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
@@ -435,6 +653,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
}
+ /** Record Region route priority changes for later diagnosis. */
private void recordRegionPriorityMap(
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
differentPriorityMap) {
LOGGER.info(ManagerMessages.REGIONPRIORITY_REGIONPRIORITYMAP);
@@ -458,6 +677,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
/**
+ * Return a snapshot of the current Region route priority map.
+ *
* @return Map<RegionGroupId, RegionPriority>
*/
public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
@@ -469,6 +690,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
}
+ /** Remove one RegionGroup's route priority from the local cache. */
public void removeRegionPriority(TConsensusGroupId regionGroupId) {
priorityMapLock.writeLock().lock();
try {
@@ -478,6 +700,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
}
+ /** Clear all cached Region route priorities. */
public void clearRegionPriority() {
priorityMapLock.writeLock().lock();
try {
@@ -530,32 +753,44 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
regionGroupIds);
}
+ /** Return the NodeManager facade used by route balance operations. */
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
+ /** Return the PartitionManager facade used by route balance operations. */
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
+ /** Return the LoadManager facade used by route balance operations. */
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
+ /** Trigger leader balance when DataNode-level load statistics change. */
@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
- balanceRegionLeader();
+ handleBalanceAction(balanceAllEnabledRegionLeaders());
}
+ /**
+ * Trigger leader balance only for RegionGroup types present in RegionGroup
statistics changes.
+ */
@Override
public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
- balanceRegionLeader();
+ handleBalanceAction(
+ balanceRegionLeadersForChangedGroups(
+ event.getDifferentRegionGroupStatisticsMap().keySet()));
}
+ /** Trigger leader balance and route priority update after consensus leader
statistics change. */
@Override
public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
- balanceRegionLeader();
+ DataRegionLeaderBalanceResult leaderBalanceResult =
+ balanceRegionLeadersForChangedGroups(
+ event.getDifferentConsensusGroupStatisticsMap().keySet());
balanceRegionPriority();
- handleBalanceAction();
+ handleBalanceAction(leaderBalanceResult);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
index b4086e2dbbd..595cf9a0691 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -40,7 +40,7 @@ public abstract class AbstractLeaderBalancer {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractLeaderBalancer.class);
public static final String GREEDY_POLICY = "GREEDY";
- public static final String CFD_POLICY = "CFD";
+ public static final String CFS_POLICY = "CFS";
public static final String HASH_POLICY = "HASH";
// Set<RegionGroupId>
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 472ddd019b8..2cb283d400e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -180,6 +180,21 @@ public class CreateRegionGroupsProcedure
LOGGER.warn(
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
}
+ setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
+ break;
+ case REBALANCE_DATA_PARTITION_POLICY:
+ if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+ // Re-balance all corresponding DataPartitionPolicyTable before the
newly created
+ // RegionGroups become available for serving partitions.
+ persistPlan
+ .getRegionGroupMap()
+ .keySet()
+ .forEach(
+ database ->
+ env.getConfigManager()
+ .getLoadManager()
+ .reBalanceDataPartitionPolicy(database));
+ }
setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS);
break;
case ACTIVATE_REGION_GROUPS:
@@ -240,17 +255,6 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
- if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
- // Re-balance all corresponding DataPartitionPolicyTable
- persistPlan
- .getRegionGroupMap()
- .keySet()
- .forEach(
- database ->
- env.getConfigManager()
- .getLoadManager()
- .reBalanceDataPartitionPolicy(database));
- }
return Flow.NO_MORE_STATE;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index 4ff90132f59..43cdc4b3f6e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -31,6 +31,10 @@ public enum CreateRegionGroupsState {
// 3. Delete redundant RegionReplicas in contrast to case 2.
SHUNT_REGION_REPLICAS,
+ // Re-balance the DataPartitionPolicyTable for the affected databases so
that the newly
+ // created DataRegionGroups are taken into account before they start serving
partitions.
+ REBALANCE_DATA_PARTITION_POLICY,
+
// Mark RegionGroupCache as available for those RegionGroups that created
successfully.
// For DataRegionGroups that use iot consensus protocol, select leader by
the way
ACTIVATE_REGION_GROUPS,