This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ade087b330 [IOTDB-4029] Latent routing policy for MultiLeader protocol
(#6880)
ade087b330 is described below
commit ade087b330f0b19930259320db1fe4770686ad67
Author: YongzaoDan <[email protected]>
AuthorDate: Sat Aug 6 08:47:20 2022 +0800
[IOTDB-4029] Latent routing policy for MultiLeader protocol (#6880)
---
.../iotdb/confignode/manager/PartitionManager.java | 11 +-
.../iotdb/confignode/manager/load/LoadManager.java | 95 ++++++++----
.../manager/load/balancer/RouteBalancer.java | 60 +++++++-
.../load/balancer/router/LazyGreedyRouter.java | 154 +++++++++++++++++++
.../manager/load/heartbeat/IRegionGroupCache.java | 9 ++
.../manager/load/heartbeat/RegionGroupCache.java | 7 +-
.../procedure/env/DataNodeRemoveHandler.java | 2 +
.../load/balancer/router/LazyGreedyRouterTest.java | 166 +++++++++++++++++++++
.../load/balancer/router/LeaderRouterTest.java | 2 +-
9 files changed, 460 insertions(+), 46 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 0ba379af5f..0e4adebb84 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -286,9 +286,8 @@ public class PartitionManager {
continue;
}
- // 2. The average number of partitions held by each Region is greater
than the expected
- // average
- // when the partition allocation is complete
+ // 2. The average number of partitions held by each Region will be
greater than the
+ // expected average number after the partition allocation is completed
if (allocatedRegionCount < maxRegionCount
&& slotCount / allocatedRegionCount > maxSlotCount /
maxRegionCount) {
// The delta is equal to the smallest integer solution that
satisfies the inequality:
@@ -306,8 +305,10 @@ public class PartitionManager {
}
// TODO: Use procedure to protect the following process
- // Do Region allocation and creation for StorageGroups based on the
allotment
- getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+ if (!allotmentMap.isEmpty()) {
+ // Do Region allocation and creation for StorageGroups based on the
allotment
+ getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+ }
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (NotEnoughDataNodeException e) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 6d5cc98cc1..ff9835cebf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -55,6 +55,7 @@ import
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCac
import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -153,8 +154,11 @@ public class LoadManager {
getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
}
AsyncDataNodeClientPool.getInstance().createRegions(createRegionGroupsPlan,
ttlMap);
+
// Persist the allocation result
getConsensusManager().write(createRegionGroupsPlan);
+ // Broadcast the latest RegionRouteMap
+ broadcastLatestRegionRouteMap();
}
/**
@@ -188,6 +192,7 @@ public class LoadManager {
* sorting result have higher priority.
*/
public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap() {
+ // Always take the latest locations of RegionGroups as the input parameter
return
routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
}
@@ -267,17 +272,19 @@ public class LoadManager {
}
private void updateNodeLoadStatistic() {
- AtomicBoolean isNeedBroadcast = new AtomicBoolean(false);
+ AtomicBoolean existFailDownDataNode = new AtomicBoolean(false);
+ AtomicBoolean existChangeLeaderSchemaRegionGroup = new
AtomicBoolean(false);
+ AtomicBoolean existChangeLeaderDataRegionGroup = new AtomicBoolean(false);
+ boolean isNeedBroadcast = false;
nodeCacheMap
.values()
.forEach(
nodeCache -> {
boolean updateResult = nodeCache.updateLoadStatistic();
- if (CONF.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)
- && nodeCache instanceof DataNodeHeartbeatCache) {
- // We need a broadcast when some DataNode fail down
- isNeedBroadcast.compareAndSet(false, updateResult);
+ if (nodeCache instanceof DataNodeHeartbeatCache) {
+ // Check if some DataNodes fail down
+ existFailDownDataNode.compareAndSet(false, updateResult);
}
});
@@ -286,13 +293,37 @@ public class LoadManager {
.forEach(
regionGroupCache -> {
boolean updateResult = regionGroupCache.updateLoadStatistic();
- if (CONF.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)) {
- // We need a broadcast when the leadership changed
- isNeedBroadcast.compareAndSet(false, updateResult);
+ switch (regionGroupCache.getConsensusGroupId().getType()) {
+ // Check if some RegionGroups change their leader
+ case SchemaRegion:
+ existChangeLeaderSchemaRegionGroup.compareAndSet(false,
updateResult);
+ break;
+ case DataRegion:
+ existChangeLeaderDataRegionGroup.compareAndSet(false,
updateResult);
+ break;
}
});
- if (isNeedBroadcast.get()) {
+ if (existFailDownDataNode.get()) {
+ // The RegionRouteMap must be broadcast if some DataNodes fail down
+ isNeedBroadcast = true;
+ }
+
+ if (RouteBalancer.leaderPolicy.equals(CONF.getRoutingPolicy())) {
+ // Check the condition of leader routing policy
+ if (existChangeLeaderSchemaRegionGroup.get()) {
+ // Broadcast the RegionRouteMap if some SchemaRegionGroups change
their leader
+ isNeedBroadcast = true;
+ }
+ if
(!ConsensusFactory.MultiLeaderConsensus.equals(CONF.getDataRegionConsensusProtocolClass())
+ && existChangeLeaderDataRegionGroup.get()) {
+ // Broadcast the RegionRouteMap if some DataRegionGroups change their
leader
+ // and the consensus protocol isn't MultiLeader
+ isNeedBroadcast = true;
+ }
+ }
+
+ if (isNeedBroadcast) {
broadcastLatestRegionRouteMap();
}
if (nodeCacheMap.size() == getNodeManager().getRegisteredNodeCount()) {
@@ -300,7 +331,7 @@ public class LoadManager {
}
}
- private void broadcastLatestRegionRouteMap() {
+ public void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
genLatestRegionRouteMap();
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new
ConcurrentHashMap<>();
getOnlineDataNodes(-1)
@@ -309,7 +340,7 @@ public class LoadManager {
dataNodeLocationMap.put(
onlineDataNode.getLocation().getDataNodeId(),
onlineDataNode.getLocation()));
- LOGGER.info("Begin to broadcast RegionRouteMap:");
+ LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
long broadcastTime = System.currentTimeMillis();
printRegionRouteMap(broadcastTime, latestRegionRouteMap);
AsyncDataNodeClientPool.getInstance()
@@ -318,7 +349,7 @@ public class LoadManager {
dataNodeLocationMap,
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
null);
- LOGGER.info("Broadcast the latest RegionRouteMap finished.");
+ LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap
finished.");
}
/** loop body of the heartbeat thread */
@@ -412,44 +443,44 @@ public class LoadManager {
public List<TConfigNodeLocation> getOnlineConfigNodes() {
return getNodeManager().getRegisteredConfigNodes().stream()
.filter(
- registeredConfigNode ->
- nodeCacheMap
- .get(registeredConfigNode.getConfigNodeId())
- .getNodeStatus()
- .equals(NodeStatus.Running))
+ registeredConfigNode -> {
+ int configNodeId = registeredConfigNode.getConfigNodeId();
+ return nodeCacheMap.containsKey(configNodeId)
+ &&
nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Running);
+ })
.collect(Collectors.toList());
}
public List<TDataNodeConfiguration> getOnlineDataNodes(int dataNodeId) {
return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
.filter(
- registeredDataNode ->
- nodeCacheMap
- .get(registeredDataNode.getLocation().getDataNodeId())
- .getNodeStatus()
- .equals(NodeStatus.Running))
+ registeredDataNode -> {
+ int id = registeredDataNode.getLocation().getDataNodeId();
+ return nodeCacheMap.containsKey(id)
+ &&
nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Running);
+ })
.collect(Collectors.toList());
}
public List<TConfigNodeLocation> getUnknownConfigNodes() {
return getNodeManager().getRegisteredConfigNodes().stream()
.filter(
- registeredConfigNode ->
- nodeCacheMap
- .get(registeredConfigNode.getConfigNodeId())
- .getNodeStatus()
- .equals(NodeStatus.Unknown))
+ registeredConfigNode -> {
+ int configNodeId = registeredConfigNode.getConfigNodeId();
+ return nodeCacheMap.containsKey(configNodeId)
+ &&
nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Unknown);
+ })
.collect(Collectors.toList());
}
public List<TDataNodeConfiguration> getUnknownDataNodes(int dataNodeId) {
return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
.filter(
- registeredDataNode ->
- nodeCacheMap
- .get(registeredDataNode.getLocation().getDataNodeId())
- .getNodeStatus()
- .equals(NodeStatus.Unknown))
+ registeredDataNode -> {
+ int id = registeredDataNode.getLocation().getDataNodeId();
+ return nodeCacheMap.containsKey(id)
+ &&
nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Unknown);
+ })
.collect(Collectors.toList());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 97c6d91952..a77fa10be2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -19,14 +19,18 @@
package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
+import
org.apache.iotdb.confignode.manager.load.balancer.router.LazyGreedyRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
import
org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,22 +45,64 @@ public class RouteBalancer {
private final IManager configManager;
+ private final LazyGreedyRouter lazyGreedyRouter;
+
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
+ this.lazyGreedyRouter = new LazyGreedyRouter();
}
public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
List<TRegionReplicaSet> regionReplicaSets) {
- return genRouter().genLatestRegionRouteMap(regionReplicaSets);
+ List<TRegionReplicaSet> schemaRegionGroups = new ArrayList<>();
+ List<TRegionReplicaSet> dataRegionGroups = new ArrayList<>();
+
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ schemaRegionGroups.add(regionReplicaSet);
+ break;
+ case DataRegion:
+ dataRegionGroups.add(regionReplicaSet);
+ break;
+ }
+ });
+
+ // Generate SchemaRegionRouteMap
+ Map<TConsensusGroupId, TRegionReplicaSet> result =
+
genRouter(TConsensusGroupType.SchemaRegion).genLatestRegionRouteMap(schemaRegionGroups);
+ // Generate DataRegionRouteMap
+ result.putAll(
+
genRouter(TConsensusGroupType.DataRegion).genLatestRegionRouteMap(dataRegionGroups));
+ return result;
}
- private IRouter genRouter() {
+ private IRouter genRouter(TConsensusGroupType groupType) {
String policy =
ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy();
- if (policy.equals(leaderPolicy)) {
- return new LeaderRouter(
- getLoadManager().getAllLeadership(),
getLoadManager().getAllLoadScores());
- } else {
- return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ switch (groupType) {
+ case SchemaRegion:
+ if (policy.equals(leaderPolicy)) {
+ return new LeaderRouter(
+ getLoadManager().getAllLeadership(),
getLoadManager().getAllLoadScores());
+ } else {
+ return new
LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ }
+ case DataRegion:
+ default:
+ if (ConfigNodeDescriptor.getInstance()
+ .getConf()
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.MultiLeaderConsensus)) {
+ // Latent router for MultiLeader consensus protocol
+
lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes(-1));
+ return lazyGreedyRouter;
+ } else if (policy.equals(leaderPolicy)) {
+ return new LeaderRouter(
+ getLoadManager().getAllLeadership(),
getLoadManager().getAllLoadScores());
+ } else {
+ return new
LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ }
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
new file mode 100644
index 0000000000..c76c79d494
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The LazyGreedyRouter mainly applies to the MultiLeader consensus protocol,
it will make the
+ * number of leaders in each online DataNode as equal as possible
+ */
+public class LazyGreedyRouter implements IRouter {
+
+ // Set<DataNodeId>
+ private final Set<Integer> unknownDataNodes;
+ private final Map<TConsensusGroupId, TRegionReplicaSet> routeMap;
+
+ public LazyGreedyRouter() {
+ this.unknownDataNodes = Collections.synchronizedSet(new HashSet<>());
+ this.routeMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Update unknownDataNodes cache in LazyRandomRouter
+ *
+ * @param unknownDataNodes DataNodes that have unknown status
+ */
+ public void updateUnknownDataNodes(List<TDataNodeConfiguration>
unknownDataNodes) {
+ synchronized (this.unknownDataNodes) {
+ this.unknownDataNodes.clear();
+ this.unknownDataNodes.addAll(
+ unknownDataNodes.stream()
+ .map(dataNodeConfiguration ->
dataNodeConfiguration.getLocation().getDataNodeId())
+ .collect(Collectors.toList()));
+ }
+ }
+
+ @Override
+ public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
+ List<TRegionReplicaSet> replicaSets) {
+ synchronized (unknownDataNodes) {
+ // Map<DataNodeId, leaderCount> Count the number of leaders in each
DataNodes
+ Map<Integer, Integer> leaderCounter = new HashMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> result = new
ConcurrentHashMap<>();
+ List<TRegionReplicaSet> updateReplicas = new ArrayList<>();
+
+ for (TRegionReplicaSet replicaSet : replicaSets) {
+ if (routeEntryNeedsUpdate(replicaSet)) {
+ // The greedy algorithm should be performed lastly
+ updateReplicas.add(replicaSet);
+ } else {
+ // Update counter
+ leaderCounter.compute(
+
routeMap.get(replicaSet.getRegionId()).getDataNodeLocations().get(0).getDataNodeId(),
+ (dataNodeId, counter) -> (counter == null ? 1 : counter + 1));
+ // Record the unaltered results
+ result.put(replicaSet.getRegionId(),
routeMap.get(replicaSet.getRegionId()));
+ }
+ }
+
+ for (TRegionReplicaSet replicaSet : updateReplicas) {
+ updateRouteEntry(replicaSet, leaderCounter);
+ result.put(replicaSet.getRegionId(),
routeMap.get(replicaSet.getRegionId()));
+ }
+
+ return result;
+ }
+ }
+
+ /** Check whether the specific RegionReplicaSet's routing policy needs
update */
+ private boolean routeEntryNeedsUpdate(TRegionReplicaSet replicaSet) {
+ TConsensusGroupId groupId = replicaSet.getRegionId();
+ if (!routeMap.containsKey(groupId)) {
+ // The RouteEntry needs update when it is not recorded yet
+ return true;
+ }
+
+ Set<Integer> cacheReplicaSet =
+ routeMap.get(groupId).getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet());
+ Set<Integer> inputReplicaSet =
+ replicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet());
+ if (!cacheReplicaSet.equals(inputReplicaSet)) {
+ // The RouteEntry needs update when the cached record is outdated
+ return true;
+ }
+
+ // The RouteEntry needs update when the status of DataNode corresponding
to the first priority
+ // is unknown
+ return unknownDataNodes.contains(
+ routeMap.get(groupId).getDataNodeLocations().get(0).getDataNodeId());
+ }
+
+ private void updateRouteEntry(TRegionReplicaSet replicaSet, Map<Integer,
Integer> leaderCounter) {
+ TRegionReplicaSet newRouteEntry = new TRegionReplicaSet(replicaSet);
+ Collections.shuffle(newRouteEntry.getDataNodeLocations(), new Random());
+
+ // Greedily select the leader replica
+ int leaderIndex = -1;
+ int locateLeaderCount = Integer.MAX_VALUE;
+ for (int i = 0; i < newRouteEntry.getDataNodeLocationsSize(); i++) {
+ int currentDataNodeId =
newRouteEntry.getDataNodeLocations().get(i).getDataNodeId();
+ if (!unknownDataNodes.contains(currentDataNodeId)
+ && leaderCounter.getOrDefault(currentDataNodeId, 0) <
locateLeaderCount) {
+ leaderIndex = i;
+ locateLeaderCount = leaderCounter.getOrDefault(currentDataNodeId, 0);
+ }
+ }
+
+ if (leaderIndex == -1) {
+ // Prevent corner case that all DataNodes fail down
+ leaderIndex = 0;
+ }
+
+ // Swap leader replica and update statistic
+ Collections.swap(newRouteEntry.getDataNodeLocations(), 0, leaderIndex);
+ leaderCounter.compute(
+ newRouteEntry.getDataNodeLocations().get(0).getDataNodeId(),
+ (dataNodeId, counter) -> (counter == null ? 1 : counter + 1));
+ routeMap.put(newRouteEntry.getRegionId(), newRouteEntry);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
index 3c13b7e855..19288b84a9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+
public interface IRegionGroupCache {
/**
@@ -40,4 +42,11 @@ public interface IRegionGroupCache {
* @return The DataNodeId of the latest leader
*/
int getLeaderDataNodeId();
+
+ /**
+ * Get RegionGroup's ConsensusGroupId
+ *
+ * @return TConsensusGroupId
+ */
+ TConsensusGroupId getConsensusGroupId();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index e325f28929..0d6d937ad8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -36,10 +36,10 @@ public class RegionGroupCache implements IRegionGroupCache {
// TODO: This class might be split into SchemaRegionGroupCache and
DataRegionGroupCache
private static final int maximumWindowSize = 100;
- // Map<DataNodeId(where a RegionReplica resides),
LinkedList<RegionHeartbeatSample>>
private final TConsensusGroupId consensusGroupId;
+ // Map<DataNodeId(where a RegionReplica resides),
LinkedList<RegionHeartbeatSample>>
private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
// Indicates the version of the statistics
@@ -107,4 +107,9 @@ public class RegionGroupCache implements IRegionGroupCache {
public int getLeaderDataNodeId() {
return leaderDataNodeId.get();
}
+
+ @Override
+ public TConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index aa5a0b39e7..67cee64c4f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -207,6 +207,8 @@ public class DataNodeRemoveHandler {
status,
originalDataNode.getInternalEndPoint().getIp(),
destDataNode.getInternalEndPoint().getIp());
+ // Broadcast the latest RegionRouteMap when Region migration finished
+ configManager.getLoadManager().broadcastLatestRegionRouteMap();
}
/**
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
new file mode 100644
index 0000000000..b3adbec0dc
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LazyGreedyRouterTest {
+
+ @Test
+ public void testGenLatestRegionRouteMap() {
+ LazyGreedyRouter lazyGreedyRouter = new LazyGreedyRouter();
+
+ /* Prepare TRegionReplicaSets */
+ List<TRegionReplicaSet> regionReplicaSetList = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ regionReplicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+ for (int j = 1; j <= 3; j++) {
+ regionReplicaSet.addToDataNodeLocations(new
TDataNodeLocation().setDataNodeId(j));
+ }
+ regionReplicaSetList.add(regionReplicaSet);
+ }
+
+ /* Test1: The number of leaders in each DataNode should be approximately 4
*/
+ Map<TConsensusGroupId, TRegionReplicaSet> routeMap =
+ lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+ Map<Integer, AtomicInteger> leaderCounter = new HashMap<>();
+ routeMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ leaderCounter
+ .computeIfAbsent(
+
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+ empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ Assert.assertEquals(3, leaderCounter.size());
+ for (int i = 1; i <= 3; i++) {
+ Assert.assertTrue(3 <= leaderCounter.get(i).get());
+ Assert.assertTrue(leaderCounter.get(i).get() <= 5);
+ }
+
+ /* Unknown DataNodes */
+ List<TDataNodeConfiguration> dataNodeConfigurations = new ArrayList<>();
+ dataNodeConfigurations.add(
+ new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(2)));
+
+ /* Test2: The number of leaders in DataNode-1 and DataNode-3 should be
approximately 6 */
+ lazyGreedyRouter.updateUnknownDataNodes(dataNodeConfigurations);
+ leaderCounter.clear();
+ routeMap = lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+ routeMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ leaderCounter
+ .computeIfAbsent(
+
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+ empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ Assert.assertEquals(2, leaderCounter.size());
+ Assert.assertTrue(4 <= leaderCounter.get(1).get());
+ Assert.assertTrue(leaderCounter.get(1).get() <= 8);
+ Assert.assertTrue(4 <= leaderCounter.get(3).get());
+ Assert.assertTrue(leaderCounter.get(3).get() <= 8);
+ }
+
+ @Test
+ public void testGenLatestRegionRouteMapWithDifferentReplicaSize() {
+ LazyGreedyRouter lazyGreedyRouter = new LazyGreedyRouter();
+
+ /* Prepare TRegionReplicaSets */
+ List<TRegionReplicaSet> regionReplicaSetList = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ regionReplicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+ for (int j = 1; j <= 3; j++) {
+ regionReplicaSet.addToDataNodeLocations(new
TDataNodeLocation().setDataNodeId(j));
+ }
+ regionReplicaSetList.add(regionReplicaSet);
+ }
+ int dataNodeId = 0;
+ for (int i = 12; i < 18; i++) {
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ regionReplicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+ for (int j = 0; j < 2; j++) {
+ regionReplicaSet.addToDataNodeLocations(
+ new TDataNodeLocation().setDataNodeId(dataNodeId + 1));
+ dataNodeId = (dataNodeId + 1) % 3;
+ }
+ regionReplicaSetList.add(regionReplicaSet);
+ }
+
+ /* Test1: The number of leaders in each DataNode should be approximately 6
*/
+ Map<TConsensusGroupId, TRegionReplicaSet> routeMap =
+ lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+ Map<Integer, AtomicInteger> leaderCounter = new HashMap<>();
+ routeMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ leaderCounter
+ .computeIfAbsent(
+
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+ empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ Assert.assertEquals(3, leaderCounter.size());
+ for (int i = 1; i <= 3; i++) {
+ Assert.assertTrue(4 <= leaderCounter.get(i).get());
+ Assert.assertTrue(leaderCounter.get(i).get() <= 8);
+ }
+
+ /* Unknown DataNodes */
+ List<TDataNodeConfiguration> dataNodeConfigurations = new ArrayList<>();
+ dataNodeConfigurations.add(
+ new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(2)));
+
+ /* Test2: The number of leaders in DataNode-1 and DataNode-3 should be
exactly 9 */
+ lazyGreedyRouter.updateUnknownDataNodes(dataNodeConfigurations);
+ leaderCounter.clear();
+ routeMap = lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+ routeMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ leaderCounter
+ .computeIfAbsent(
+
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+ empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ Assert.assertEquals(2, leaderCounter.size());
+ Assert.assertTrue(7 <= leaderCounter.get(1).get());
+ Assert.assertTrue(leaderCounter.get(1).get() <= 11);
+ Assert.assertTrue(7 <= leaderCounter.get(3).get());
+ Assert.assertTrue(leaderCounter.get(3).get() <= 11);
+ }
+}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index be8dc302f2..db134efd25 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class LeaderRouterTest {
@Test
- public void genRealTimeRoutingPolicy() {
+ public void testGenRealTimeRoutingPolicy() {
// Build TDataNodeLocations
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 6; i++) {