This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 82e984baf2 [IOTDB-3947] LeaderPolicy can't broadcast when a DataNode
down (#6777)
82e984baf2 is described below
commit 82e984baf25b4917221cc5fa19c21eb9484af966
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jul 28 08:53:36 2022 +0800
[IOTDB-3947] LeaderPolicy can't broadcast when a DataNode down (#6777)
---
.../async/datanode/AsyncDataNodeClientPool.java | 1 -
.../async/handlers/DataNodeHeartbeatHandler.java | 3 +-
.../handlers/UpdateRegionRouteMapHandler.java | 7 +-
.../iotdb/confignode/manager/load/LoadManager.java | 25 +++-
.../manager/load/heartbeat/RegionGroupCache.java | 14 ++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 4 +-
.../load/balancer/router/LeaderRouterTest.java | 137 +++++++++++++++++++--
7 files changed, 168 insertions(+), 23 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 1547e617d8..d12c0f2705 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -326,7 +326,6 @@ public class AsyncDataNodeClientPool {
*/
public void getDataNodeHeartBeat(
TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler)
{
- // TODO: Add a retry logic
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index a148c1cc14..6891c032e7 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -66,7 +66,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
.forEach(
(consensusGroupId, isLeader) ->
regionGroupCacheMap
- .computeIfAbsent(consensusGroupId, empty -> new
RegionGroupCache())
+ .computeIfAbsent(
+ consensusGroupId, empty -> new
RegionGroupCache(consensusGroupId))
.cacheHeartbeatSample(
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index 4877fb70a6..30a88953c1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -47,16 +47,17 @@ public class UpdateRegionRouteMapHandler extends
AbstractRetryHandler
public void onComplete(TSStatus status) {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
- LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}",
targetDataNode);
+ LOGGER.info(
+ "Successfully update the RegionRouteMap on DataNode: {}",
targetDataNode.getDataNodeId());
} else {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed",
targetDataNode);
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed",
targetDataNode.getDataNodeId());
}
countDownLatch.countDown();
}
@Override
public void onError(Exception e) {
- LOGGER.error("Update RegionRouteMap on DataNode: {} failed",
targetDataNode);
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed",
targetDataNode.getDataNodeId());
countDownLatch.countDown();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 9886593212..61babc277a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -303,10 +303,12 @@ public class LoadManager {
dataNodeLocationMap.put(
onlineDataNode.getLocation().getDataNodeId(),
onlineDataNode.getLocation()));
- LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
+ LOGGER.info("Begin to broadcast RegionRouteMap:");
+ long broadcastTime = System.currentTimeMillis();
+ printRegionRouteMap(broadcastTime, latestRegionRouteMap);
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
- new TRegionRouteReq(System.currentTimeMillis(),
latestRegionRouteMap),
+ new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
dataNodeLocationMap,
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
null);
@@ -333,6 +335,9 @@ public class LoadManager {
* @param registeredDataNodes DataNodes that registered in cluster
*/
private void pingRegisteredDataNodes(List<TDataNodeConfiguration>
registeredDataNodes) {
+ // Generate heartbeat request
+ THeartbeatReq heartbeatReq = genHeartbeatReq();
+
// Send heartbeat requests
for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
DataNodeHeartbeatHandler handler =
@@ -345,7 +350,7 @@ public class LoadManager {
regionGroupCacheMap);
AsyncDataNodeClientPool.getInstance()
.getDataNodeHeartBeat(
- dataNodeInfo.getLocation().getInternalEndPoint(),
genHeartbeatReq(), handler);
+ dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq,
handler);
}
}
@@ -411,6 +416,20 @@ public class LoadManager {
.collect(Collectors.toList());
}
+ public static void printRegionRouteMap(
+ long timestamp, Map<TConsensusGroupId, TRegionReplicaSet>
regionRouteMap) {
+ LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
+ LOGGER.info("[latestRegionRouteMap] RegionRouteMap:");
+ for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry :
regionRouteMap.entrySet()) {
+ LOGGER.info(
+ "[latestRegionRouteMap]\t {}={}",
+ entry.getKey(),
+ entry.getValue().getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ }
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index fbca007f5c..e325f28929 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -18,6 +18,11 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,10 +31,15 @@ import java.util.concurrent.atomic.AtomicLong;
public class RegionGroupCache implements IRegionGroupCache {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegionGroupCache.class);
+
// TODO: This class might be split into SchemaRegionGroupCache and
DataRegionGroupCache
private static final int maximumWindowSize = 100;
// Map<DataNodeId(where a RegionReplica resides),
LinkedList<RegionHeartbeatSample>>
+
+ private final TConsensusGroupId consensusGroupId;
+
private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
// Indicates the version of the statistics
@@ -37,7 +47,9 @@ public class RegionGroupCache implements IRegionGroupCache {
// The DataNode where the leader resides
private final AtomicInteger leaderDataNodeId;
- public RegionGroupCache() {
+ public RegionGroupCache(TConsensusGroupId consensusGroupId) {
+ this.consensusGroupId = consensusGroupId;
+
this.slidingWindow = new ConcurrentHashMap<>();
this.versionTimestamp = new AtomicLong(0);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 03a5a02c21..0ecc43bea0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -538,7 +538,9 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
TRegionRouteMapResp resp = configManager.getLatestRegionRouteMap();
- LOGGER.info("Generate a latest RegionRouteMap: {}", resp);
+ configManager
+ .getLoadManager()
+ .printRegionRouteMap(resp.getTimestamp(), resp.getRegionRouteMap());
return resp;
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 2d0e7dc87a..be8dc302f2 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -25,7 +25,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
+import
org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
@@ -41,7 +44,7 @@ public class LeaderRouterTest {
@Test
public void genRealTimeRoutingPolicy() {
- /* Build TDataNodeLocations */
+ // Build TDataNodeLocations
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 6; i++) {
dataNodeLocations.add(
@@ -54,7 +57,7 @@ public class LeaderRouterTest {
new TEndPoint("0.0.0.0", 50010 + i)));
}
- /* Build nodeCacheMap */
+ // Build nodeCacheMap
long currentTimeMillis = System.currentTimeMillis();
Map<Integer, INodeCache> nodeCacheMap = new HashMap<>();
for (int i = 0; i < 6; i++) {
@@ -67,13 +70,13 @@ public class LeaderRouterTest {
}
nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
- /* Get the loadScoreMap */
+ // Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
- /* Build TRegionReplicaSet */
+ // Build TRegionReplicaSet
TConsensusGroupId groupId1 = new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
@@ -86,30 +89,138 @@ public class LeaderRouterTest {
groupId2,
Arrays.asList(
dataNodeLocations.get(5), dataNodeLocations.get(4),
dataNodeLocations.get(3)));
+ List<TRegionReplicaSet> regionReplicaSets =
Arrays.asList(regionReplicaSet1, regionReplicaSet2);
- /* Build leaderMap */
+ // Build regionGroupCacheMap
+ Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap = new
HashMap<>();
+ regionGroupCacheMap.put(groupId1, new RegionGroupCache(groupId1));
+ regionGroupCacheMap.put(groupId2, new RegionGroupCache(groupId2));
+
+ /* Simulate ratis consensus protocol(only one leader) */
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false));
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true));
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false));
+
+ // Get leaderMap
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
- leaderMap.put(groupId1, 1);
- leaderMap.put(groupId2, 4);
+ regionGroupCacheMap
+ .values()
+ .forEach(regionGroupCache ->
Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+ regionGroupCacheMap.forEach(
+ (groupId, regionGroupCache) ->
+ leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
- /* Check result */
+ // Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
- new LeaderRouter(leaderMap, loadScoreMap)
- .genLatestRegionRouteMap(Arrays.asList(regionReplicaSet1,
regionReplicaSet2));
- Assert.assertEquals(2, result.size());
-
+ new LeaderRouter(leaderMap,
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
TRegionReplicaSet result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1),
result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0),
result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(2),
result1.getDataNodeLocations().get(2));
-
TRegionReplicaSet result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(4),
result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3),
result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5),
result2.getDataNodeLocations().get(2));
+
+ /* Simulate multiLeader consensus protocol(Each Region believes it is the
leader) */
+ for (int i = 2; i <= 1000; i++) {
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10, i * 10, 0,
true));
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 1, i * 10 +
1, 1, true));
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 2, i * 10 +
2, 2, true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 3, i * 10 +
3, 3, true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 4, i * 10 +
4, 4, true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 5, i * 10 +
5, 5, true));
+
+ // Get leaderMap
+ leaderMap.clear();
+
regionGroupCacheMap.values().forEach(IRegionGroupCache::updateLoadStatistic);
+ regionGroupCacheMap.forEach(
+ (groupId, regionGroupCache) ->
+ leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+
+ // Check result
+ result = new LeaderRouter(leaderMap,
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
+ result1 = result.get(groupId1);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(2),
result1.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(0),
result1.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(1),
result1.getDataNodeLocations().get(2));
+ result2 = result.get(groupId2);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(5),
result2.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(3),
result2.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(4),
result2.getDataNodeLocations().get(2));
+ }
+
+ /* Simulate multiLeader consensus protocol with a DataNode fails down */
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10030, 10030, 0,
true));
+ regionGroupCacheMap
+ .get(groupId1)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10031, 10031, 1,
true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10033, 10033, 3,
true));
+ regionGroupCacheMap
+ .get(groupId2)
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10034, 10034, 4,
true));
+
+ // Get leaderMap
+ leaderMap.clear();
+ regionGroupCacheMap
+ .values()
+ .forEach(regionGroupCache ->
Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+ regionGroupCacheMap.forEach(
+ (groupId, regionGroupCache) ->
+ leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+
+ // Check result
+ result = new LeaderRouter(leaderMap,
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
+ result1 = result.get(groupId1);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(1),
result1.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(0),
result1.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(2),
result1.getDataNodeLocations().get(2));
+ result2 = result.get(groupId2);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(4),
result2.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(3),
result2.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(5),
result2.getDataNodeLocations().get(2));
}
}