This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch better-region-life-cycle
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/better-region-life-cycle by
this push:
new 2fab6474811 Fix region leader balance blocking activation
2fab6474811 is described below
commit 2fab64748119e1ada21e156295646a6537f8ed67
Author: Yongzao <[email protected]>
AuthorDate: Mon May 25 13:08:33 2026 +0800
Fix region leader balance blocking activation
---
.../manager/load/balancer/RouteBalancer.java | 256 ++++++++++++++-------
.../impl/region/CreateRegionGroupsProcedure.java | 26 ++-
.../procedure/state/CreateRegionGroupsState.java | 4 +
3 files changed, 187 insertions(+), 99 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e3954a59385..3fca396edc0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -60,7 +60,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -70,8 +72,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/** The RouteBalancer guides the cluster RegionGroups' leader distribution and
routing priority. */
@@ -104,13 +106,17 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
ProcedureManager.PROCEDURE_WAIT_TIME_OUT -
TimeUnit.SECONDS.toMillis(2),
TimeUnit.SECONDS.toMillis(10));
private static final long WAIT_PRIORITY_INTERVAL = 10;
+ private static final long RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS =
TimeUnit.SECONDS.toMillis(10);
private final IManager configManager;
// For generating optimal Region leader distribution
- private final AbstractLeaderBalancer leaderBalancer;
+ private final AbstractLeaderBalancer schemaLeaderBalancer;
+ private final AbstractLeaderBalancer dataLeaderBalancer;
// For generating optimal cluster Region routing priority
private final IPriorityBalancer priorityRouter;
+ private final ReentrantLock schemaRegionLeaderBalanceLock;
+ private final ReentrantLock dataRegionLeaderBalanceLock;
private final ReentrantReadWriteLock priorityMapLock;
// Map<RegionGroupId, Region priority>
// The client requests are preferentially routed to the Region with the
lowest index in the
@@ -121,29 +127,16 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 20 *
1000L * 1000L * 1000L;
private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
- private final Map<Integer, List<String>> lastBalancedOldLeaderId2RegionMap;
- private Map<TConsensusGroupId, Integer> lastDataRegion2OldLeaderMap;
- private Set<TConsensusGroupId> lastBalancedDataRegionSet;
-
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
+ this.schemaRegionLeaderBalanceLock = new ReentrantLock();
+ this.dataRegionLeaderBalanceLock = new ReentrantLock();
this.priorityMapLock = new ReentrantReadWriteLock();
this.regionPriorityMap = new TreeMap<>();
- this.lastFailedTimeForLeaderBalance = new TreeMap<>();
- this.lastBalancedOldLeaderId2RegionMap = new ConcurrentHashMap<>();
+ this.lastFailedTimeForLeaderBalance = new ConcurrentHashMap<>();
- switch (CONF.getLeaderDistributionPolicy()) {
- case AbstractLeaderBalancer.GREEDY_POLICY:
- this.leaderBalancer = new GreedyLeaderBalancer();
- break;
- case AbstractLeaderBalancer.HASH_POLICY:
- this.leaderBalancer = new HashLeaderBalancer();
- break;
- case AbstractLeaderBalancer.CFD_POLICY:
- default:
- this.leaderBalancer = new CostFlowSelectionLeaderBalancer();
- break;
- }
+ this.schemaLeaderBalancer = createLeaderBalancer();
+ this.dataLeaderBalancer = createLeaderBalancer();
switch (CONF.getRoutePriorityPolicy()) {
case IPriorityBalancer.GREEDY_POLICY:
@@ -156,23 +149,108 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
}
+ private static class DataRegionLeaderBalanceResult {
+
+ private final Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap;
+ private final Set<TConsensusGroupId> balancedDataRegionSet;
+ private final Map<Integer, List<String>> balancedOldLeaderId2RegionMap;
+
+ private DataRegionLeaderBalanceResult(
+ Map<TConsensusGroupId, Integer> dataRegion2OldLeaderMap,
+ Set<TConsensusGroupId> balancedDataRegionSet,
+ Map<Integer, List<String>> balancedOldLeaderId2RegionMap) {
+ this.dataRegion2OldLeaderMap = dataRegion2OldLeaderMap;
+ this.balancedDataRegionSet = balancedDataRegionSet;
+ this.balancedOldLeaderId2RegionMap = balancedOldLeaderId2RegionMap;
+ }
+
+ private static DataRegionLeaderBalanceResult empty() {
+ return new DataRegionLeaderBalanceResult(
+ Collections.emptyMap(), Collections.emptySet(),
Collections.emptyMap());
+ }
+
+ private boolean hasBalancedDataRegion() {
+ return !balancedDataRegionSet.isEmpty();
+ }
+ }
+
+ private AbstractLeaderBalancer createLeaderBalancer() {
+ switch (CONF.getLeaderDistributionPolicy()) {
+ case AbstractLeaderBalancer.GREEDY_POLICY:
+ return new GreedyLeaderBalancer();
+ case AbstractLeaderBalancer.HASH_POLICY:
+ return new HashLeaderBalancer();
+ case AbstractLeaderBalancer.CFD_POLICY:
+ default:
+ return new CostFlowSelectionLeaderBalancer();
+ }
+ }
+
/** Balance cluster RegionGroup leader distribution through configured
algorithm. */
- private synchronized void balanceRegionLeader() {
+ private DataRegionLeaderBalanceResult balanceRegionLeader() {
+ DataRegionLeaderBalanceResult result =
DataRegionLeaderBalanceResult.empty();
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
- balanceRegionLeader(TConsensusGroupType.SchemaRegion,
SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ balanceRegionLeader(TConsensusGroupType.SchemaRegion);
}
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
- balanceRegionLeader(TConsensusGroupType.DataRegion,
DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
+ result = balanceRegionLeader(TConsensusGroupType.DataRegion);
}
+ return result;
}
- private void balanceRegionLeader(
- TConsensusGroupType regionGroupType, String consensusProtocolClass) {
+ private DataRegionLeaderBalanceResult
balanceRegionLeader(Set<TConsensusGroupId> regionGroupIds) {
+ DataRegionLeaderBalanceResult result =
DataRegionLeaderBalanceResult.empty();
+ if (regionGroupIds.stream()
+ .anyMatch(
+ regionGroupId ->
TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()))
+ && IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
+ balanceRegionLeader(TConsensusGroupType.SchemaRegion);
+ }
+ if (regionGroupIds.stream()
+ .anyMatch(
+ regionGroupId ->
TConsensusGroupType.DataRegion.equals(regionGroupId.getType()))
+ && IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
+ result = balanceRegionLeader(TConsensusGroupType.DataRegion);
+ }
+ return result;
+ }
+
+ private DataRegionLeaderBalanceResult
balanceRegionLeader(TConsensusGroupType regionGroupType) {
+ final ReentrantLock leaderBalanceLock;
+ final AbstractLeaderBalancer targetLeaderBalancer;
+ final String consensusProtocolClass;
+ switch (regionGroupType) {
+ case SchemaRegion:
+ leaderBalanceLock = schemaRegionLeaderBalanceLock;
+ targetLeaderBalancer = schemaLeaderBalancer;
+ consensusProtocolClass = SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS;
+ break;
+ case DataRegion:
+ leaderBalanceLock = dataRegionLeaderBalanceLock;
+ targetLeaderBalancer = dataLeaderBalancer;
+ consensusProtocolClass = DATA_REGION_CONSENSUS_PROTOCOL_CLASS;
+ break;
+ default:
+ return DataRegionLeaderBalanceResult.empty();
+ }
+
+ leaderBalanceLock.lock();
+ try {
+ return balanceRegionLeader(regionGroupType, consensusProtocolClass,
targetLeaderBalancer);
+ } finally {
+ leaderBalanceLock.unlock();
+ }
+ }
+
+ private DataRegionLeaderBalanceResult balanceRegionLeader(
+ TConsensusGroupType regionGroupType,
+ String consensusProtocolClass,
+ AbstractLeaderBalancer targetLeaderBalancer) {
// Collect the latest data and generate the optimal leader distribution
Map<TConsensusGroupId, Integer> currentLeaderMap =
getLoadManager().getLoadCache().getRegionLeaderMap(regionGroupType);
Map<TConsensusGroupId, Integer> optimalLeaderMap =
- leaderBalancer.generateOptimalLeaderDistribution(
+ targetLeaderBalancer.generateOptimalLeaderDistribution(
getLoadManager().getLoadCache().getCurrentDatabaseRegionGroupMap(regionGroupType),
getLoadManager().getLoadCache().getCurrentRegionLocationMap(regionGroupType),
currentLeaderMap,
@@ -185,6 +263,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
DataNodeAsyncRequestContext<TRegionLeaderChangeReq,
TRegionLeaderChangeResp> clientHandler =
new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CHANGE_REGION_LEADER);
Map<TConsensusGroupId, ConsensusGroupHeartbeatSample> successTransferMap =
new TreeMap<>();
+ Map<Integer, List<String>> balancedOldLeaderId2RegionMap = new HashMap<>();
optimalLeaderMap.forEach(
(regionGroupId, newLeaderId) -> {
if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
@@ -214,7 +293,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
regionGroupId, new
ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
// Prepare data for flushOldLeader
if (oldLeaderId != -1) {
- lastBalancedOldLeaderId2RegionMap.compute(
+ balancedOldLeaderId2RegionMap.compute(
oldLeaderId,
(k, v) -> {
if (v == null) {
@@ -259,18 +338,20 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
});
if (requestId.get() > 0) {
// Don't retry ChangeLeader request
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(clientHandler);
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(clientHandler, 1,
RATIS_CHANGE_LEADER_RPC_TIMEOUT_IN_MS);
for (int i = 0; i < requestId.get(); i++) {
- if (clientHandler.getResponseMap().get(i).getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ TRegionLeaderChangeResp response =
clientHandler.getResponseMap().get(i);
+ if (response != null
+ && response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successTransferMap.put(
clientHandler.getRequest(i).getRegionId(),
new ConsensusGroupHeartbeatSample(
-
clientHandler.getResponseMap().get(i).getConsensusLogicalTimestamp(),
+ response.getConsensusLogicalTimestamp(),
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()));
} else {
lastFailedTimeForLeaderBalance.put(
- clientHandler.getRequest(i).getRegionId(), currentTime);
+ clientHandler.getRequest(i).getRegionId(), System.nanoTime());
LOGGER.error(
ManagerMessages.LEADERBALANCER_FAILED_TO_CHANGE_THE_LEADER_OF_REGION_TO_DATANODE,
clientHandler.getRequest(i).getRegionId(),
@@ -283,58 +364,59 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
// Prepare data for invalidSchemaCacheOfOldLeaders
if (regionGroupType.equals(TConsensusGroupType.DataRegion)) {
- lastBalancedDataRegionSet = successTransferMap.keySet();
- lastDataRegion2OldLeaderMap = currentLeaderMap;
+ return new DataRegionLeaderBalanceResult(
+ new HashMap<>(currentLeaderMap),
+ new HashSet<>(successTransferMap.keySet()),
+ balancedOldLeaderId2RegionMap);
}
+ return DataRegionLeaderBalanceResult.empty();
}
- private void invalidateSchemaCacheOfOldLeaders() {
- BiConsumer<Map<TConsensusGroupId, Integer>, Set<TConsensusGroupId>>
consumer =
- (oldLeaderMap, successTransferSet) -> {
- final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
- new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
- final AtomicInteger requestIndex = new AtomicInteger(0);
- oldLeaderMap.entrySet().stream()
- .filter(entry -> successTransferSet.contains(entry.getKey()))
- .forEach(
- entry -> {
- // set target
- final Integer dataNodeId = entry.getValue();
- if (dataNodeId == -1) {
- return;
- }
- final TDataNodeLocation dataNodeLocation =
-
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
- if (dataNodeLocation == null) {
-
LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID, dataNodeId);
- return;
- }
- invalidateSchemaCacheRequestHandler.putNodeLocation(
- requestIndex.get(), dataNodeLocation);
- // set req
- final TConsensusGroupId consensusGroupId = entry.getKey();
- final String database =
-
getPartitionManager().getRegionDatabase(consensusGroupId);
-
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
- requestIndex.incrementAndGet();
- });
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
- };
-
- if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) {
- consumer.accept(lastDataRegion2OldLeaderMap, lastBalancedDataRegionSet);
+ private void invalidateSchemaCacheOfOldLeaders(
+ DataRegionLeaderBalanceResult leaderBalanceResult) {
+ if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
+ || !leaderBalanceResult.hasBalancedDataRegion()) {
+ return;
}
+
+ final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
+ final AtomicInteger requestIndex = new AtomicInteger(0);
+ leaderBalanceResult.dataRegion2OldLeaderMap.entrySet().stream()
+ .filter(entry ->
leaderBalanceResult.balancedDataRegionSet.contains(entry.getKey()))
+ .forEach(
+ entry -> {
+ // set target
+ final Integer dataNodeId = entry.getValue();
+ if (dataNodeId == -1) {
+ return;
+ }
+ final TDataNodeLocation dataNodeLocation =
+
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
+ if (dataNodeLocation == null) {
+
LOGGER.warn(ManagerMessages.DATANODELOCATION_IS_NULL_DATANODEID, dataNodeId);
+ return;
+ }
+ invalidateSchemaCacheRequestHandler.putNodeLocation(
+ requestIndex.get(), dataNodeLocation);
+ // set req
+ final TConsensusGroupId consensusGroupId = entry.getKey();
+ final String database =
getPartitionManager().getRegionDatabase(consensusGroupId);
+
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
+ requestIndex.incrementAndGet();
+ });
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
}
- private void flushOldLeaderIfIoTV2() {
+ private void flushOldLeaderIfIoTV2(DataRegionLeaderBalanceResult
leaderBalanceResult) {
if (!IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION
- || !Objects.equals(
- DATA_REGION_CONSENSUS_PROTOCOL_CLASS,
ConsensusFactory.IOT_CONSENSUS_V2)) {
+ || !Objects.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS,
ConsensusFactory.IOT_CONSENSUS_V2)
+ || leaderBalanceResult.balancedOldLeaderId2RegionMap.isEmpty()) {
return;
}
- BiConsumer<Integer, List<String>> consumer =
+ leaderBalanceResult.balancedOldLeaderId2RegionMap.forEach(
(oldLeaderId, regionGroupIds) -> {
TDataNodeConfiguration configuration =
getNodeManager().getRegisteredDataNode(oldLeaderId);
@@ -358,21 +440,18 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
oldLeaderId,
regionGroupIds);
}
- };
- lastBalancedOldLeaderId2RegionMap.forEach(consumer);
- // after flush, clear map for next balance
- lastBalancedOldLeaderId2RegionMap.clear();
+ });
}
- private synchronized void handleBalanceAction() {
- invalidateSchemaCacheOfOldLeaders();
- flushOldLeaderIfIoTV2();
+ private void handleBalanceAction(DataRegionLeaderBalanceResult
leaderBalanceResult) {
+ invalidateSchemaCacheOfOldLeaders(leaderBalanceResult);
+ flushOldLeaderIfIoTV2(leaderBalanceResult);
}
- public synchronized void balanceRegionLeaderAndPriority() {
- balanceRegionLeader();
+ public void balanceRegionLeaderAndPriority() {
+ DataRegionLeaderBalanceResult leaderBalanceResult = balanceRegionLeader();
balanceRegionPriority();
- handleBalanceAction();
+ handleBalanceAction(leaderBalanceResult);
}
/** Balance cluster RegionGroup route priority through configured algorithm.
*/
@@ -544,18 +623,19 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
- balanceRegionLeader();
+ handleBalanceAction(balanceRegionLeader());
}
@Override
public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
- balanceRegionLeader();
+
handleBalanceAction(balanceRegionLeader(event.getDifferentRegionGroupStatisticsMap().keySet()));
}
@Override
public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
- balanceRegionLeader();
+ DataRegionLeaderBalanceResult leaderBalanceResult =
+
balanceRegionLeader(event.getDifferentConsensusGroupStatisticsMap().keySet());
balanceRegionPriority();
- handleBalanceAction();
+ handleBalanceAction(leaderBalanceResult);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index 472ddd019b8..2cb283d400e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -180,6 +180,21 @@ public class CreateRegionGroupsProcedure
LOGGER.warn(
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
}
+ setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
+ break;
+ case REBALANCE_DATA_PARTITION_POLICY:
+ if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+ // Re-balance all corresponding DataPartitionPolicyTable before the
newly created
+ // RegionGroups become available for serving partitions.
+ persistPlan
+ .getRegionGroupMap()
+ .keySet()
+ .forEach(
+ database ->
+ env.getConfigManager()
+ .getLoadManager()
+ .reBalanceDataPartitionPolicy(database));
+ }
setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS);
break;
case ACTIVATE_REGION_GROUPS:
@@ -240,17 +255,6 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
- if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
- // Re-balance all corresponding DataPartitionPolicyTable
- persistPlan
- .getRegionGroupMap()
- .keySet()
- .forEach(
- database ->
- env.getConfigManager()
- .getLoadManager()
- .reBalanceDataPartitionPolicy(database));
- }
return Flow.NO_MORE_STATE;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index 4ff90132f59..43cdc4b3f6e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -31,6 +31,10 @@ public enum CreateRegionGroupsState {
// 3. Delete redundant RegionReplicas in contrast to case 2.
SHUNT_REGION_REPLICAS,
+ // Re-balance the DataPartitionPolicyTable for the affected databases so
that the newly
+ // created DataRegionGroups are taken into account before they start serving
partitions.
+ REBALANCE_DATA_PARTITION_POLICY,
+
// Mark RegionGroupCache as available for those RegionGroups that created
successfully.
// For DataRegionGroups that use iot consensus protocol, select leader by
the way
ACTIVATE_REGION_GROUPS,