This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 826a0e4818 [IOTDB-3509] Read/Write Routing Policy (Routing to leader)
(#6377)
826a0e4818 is described below
commit 826a0e48188dfb1f61a7703175c05fb4e7a37110
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 30 13:59:55 2022 +0800
[IOTDB-3509] Read/Write Routing Policy (Routing to leader) (#6377)
---
.../resources/conf/iotdb-confignode.properties | 13 +++
.../client/handlers/DataNodeHeartbeatHandler.java | 29 ++++++-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 12 +++
.../confignode/conf/ConfigNodeDescriptor.java | 2 +
.../confignode/conf/ConfigNodeStartupCheck.java | 7 ++
.../iotdb/confignode/manager/load/LoadManager.java | 51 +++++++++---
.../manager/load/balancer/RouteBalancer.java | 14 +++-
.../manager/load/balancer/router/LeaderRouter.java | 94 ++++++++++++++++++++++
.../load/heartbeat/DataNodeHeartbeatCache.java | 2 +
.../manager/load/heartbeat/IRegionGroupCache.java | 37 +++++++++
.../manager/load/heartbeat/RegionGroupCache.java | 47 +++++++++++
.../thrift/impl/DataNodeRPCServiceImpl.java | 15 ++--
thrift/src/main/thrift/datanode.thrift | 4 +-
13 files changed, 307 insertions(+), 20 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 4753238c6a..0c343e7a15 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -273,3 +273,16 @@ target_config_nodes=0.0.0.0:22277
# The heartbeat interval in milliseconds, default is 1000ms
# Datatype: long
# heartbeat_interval=1000
+
+
+####################
+### Routing policy
+####################
+
+
+# The routing policy of read/write requests
+# These routing policy are currently supported:
+# 1. leader(Default, routing to leader replica)
+# 2. greedy(Routing to replica with the lowest load, might cause read
un-consistent)
+# Datatype: string
+# routing_policy=greedy
\ No newline at end of file
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index 0da0022626..4dcb53cb3b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -18,15 +18,20 @@
*/
package org.apache.iotdb.confignode.client.handlers;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatResp> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeHeartbeatHandler.class);
@@ -34,17 +39,35 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
// Update DataNodeHeartbeatCache when success
private final TDataNodeLocation dataNodeLocation;
private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
+ private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
public DataNodeHeartbeatHandler(
- TDataNodeLocation dataNodeLocation, DataNodeHeartbeatCache
dataNodeHeartbeatCache) {
+ TDataNodeLocation dataNodeLocation,
+ DataNodeHeartbeatCache dataNodeHeartbeatCache,
+ Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap) {
this.dataNodeLocation = dataNodeLocation;
this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
+ this.regionGroupCacheMap = regionGroupCacheMap;
}
@Override
- public void onComplete(THeartbeatResp tHeartbeatResp) {
+ public void onComplete(THeartbeatResp heartbeatResp) {
dataNodeHeartbeatCache.cacheHeartBeat(
- new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
+ new HeartbeatPackage(heartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
+
+ if (heartbeatResp.isSetJudgedLeaders()) {
+ heartbeatResp
+ .getJudgedLeaders()
+ .forEach(
+ (consensusGroupId, isLeader) -> {
+ if (isLeader) {
+ regionGroupCacheMap
+ .computeIfAbsent(consensusGroupId, empty -> new
RegionGroupCache())
+ .updateLeader(
+ heartbeatResp.getHeartbeatTimestamp(),
dataNodeLocation.getDataNodeId());
+ }
+ });
+ }
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 1e02b95b8d..00489b91ce 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.rpc.RpcUtils;
@@ -145,6 +146,9 @@ public class ConfigNodeConfig {
/** The heartbeat interval in milliseconds */
private long heartbeatInterval = 1000;
+ /** The routing policy of read/write requests */
+ private String routingPolicy = RouteBalancer.greedyPolicy;
+
ConfigNodeConfig() {
// empty constructor
}
@@ -441,4 +445,12 @@ public class ConfigNodeConfig {
public void setHeartbeatInterval(long heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
+
+ public String getRoutingPolicy() {
+ return routingPolicy;
+ }
+
+ public void setRoutingPolicy(String routingPolicy) {
+ this.routingPolicy = routingPolicy;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 29b1feb008..55f8235a16 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -227,6 +227,8 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"heartbeat_interval",
String.valueOf(conf.getHeartbeatInterval()))));
+ conf.setRoutingPolicy(properties.getProperty("routing_policy",
conf.getRoutingPolicy()));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 1f6f769529..3d31c3ac4f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -124,6 +125,12 @@ public class ConfigNodeStartupCheck {
String.format(
"%s or %s", ConsensusFactory.StandAloneConsensus,
ConsensusFactory.RatisConsensus));
}
+
+ if (!conf.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)
+ && !conf.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)) {
+ throw new ConfigurationException(
+ "routing_policy", conf.getRoutingPolicy(), "leader or greedy");
+ }
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index bd886256b6..bcfca8114a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.slf4j.Logger;
@@ -60,6 +61,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* The LoadManager at ConfigNodeGroup-Leader is active. It proactively
implements the cluster
@@ -73,11 +75,16 @@ public class LoadManager {
private final long heartbeatInterval =
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+
+ /** Heartbeat sample cache */
// Map<NodeId, IHeartbeatStatistic>
private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
+ // Map<RegionId, RegionGroupCache>
+ private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
- // Balancers
+ /** Balancers */
private final RegionBalancer regionBalancer;
+
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
@@ -89,15 +96,18 @@ public class LoadManager {
private final Object heartbeatMonitor = new Object();
private Future<?> currentHeartbeatFuture;
- private int balanceCount = 0;
+ private final AtomicInteger balanceCount;
public LoadManager(IManager configManager) {
this.configManager = configManager;
this.heartbeatCacheMap = new ConcurrentHashMap<>();
+ this.regionGroupCacheMap = new ConcurrentHashMap<>();
this.regionBalancer = new RegionBalancer(configManager);
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
+
+ this.balanceCount = new AtomicInteger(0);
}
/**
@@ -173,10 +183,26 @@ public class LoadManager {
return result;
}
+ /**
+ * Get the leadership of each RegionGroup
+ *
+ * @return Map<RegionGroupId, leader location>
+ */
+ public Map<TConsensusGroupId, Integer> getAllLeadership() {
+ Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+
+ regionGroupCacheMap.forEach(
+ (consensusGroupId, regionGroupCache) ->
+ result.put(consensusGroupId,
regionGroupCache.getLeaderDataNodeId()));
+
+ return result;
+ }
+
/** Start the heartbeat service */
public void start() {
LOGGER.debug("Start Heartbeat Service of LoadManager");
synchronized (heartbeatMonitor) {
+ balanceCount.set(0);
if (currentHeartbeatFuture == null) {
currentHeartbeatFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -208,18 +234,24 @@ public class LoadManager {
// Send heartbeat requests to all the online ConfigNodes
pingOnlineConfigNodes(getNodeManager().getOnlineConfigNodes());
// Do load balancing
- doLoadBalancing(balanceCount);
- balanceCount += 1;
+ doLoadBalancing();
+ balanceCount.getAndIncrement();
}
}
private THeartbeatReq genHeartbeatReq() {
- return new THeartbeatReq(System.currentTimeMillis());
+ THeartbeatReq heartbeatReq = new THeartbeatReq();
+ heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+ // We update RegionGroups' leadership in every 5s
+ heartbeatReq.setNeedJudgeLeader(balanceCount.get() % 5 == 0);
+ // We sample DataNode load in every 10s
+ heartbeatReq.setNeedSamplingLoad(balanceCount.get() % 10 == 0);
+ return heartbeatReq;
}
- private void doLoadBalancing(int balanceCount) {
- if (balanceCount % 5 == 0) {
- // We update nodes' load statistic in every 5s
+ private void doLoadBalancing() {
+ if (balanceCount.get() % 10 == 0) {
+ // We update nodes' load statistic in every 10s
updateNodeLoadStatistic();
}
}
@@ -242,7 +274,8 @@ public class LoadManager {
(DataNodeHeartbeatCache)
heartbeatCacheMap.computeIfAbsent(
dataNodeInfo.getLocation().getDataNodeId(),
- empty -> new DataNodeHeartbeatCache()));
+ empty -> new DataNodeHeartbeatCache()),
+ regionGroupCacheMap);
AsyncDataNodeClientPool.getInstance()
.getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(),
genHeartbeatReq(), handler);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index aebb25a43d..bfcb0996e3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
+import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
import
org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
import java.util.List;
@@ -34,6 +36,9 @@ import java.util.Map;
*/
public class RouteBalancer {
+ public static final String leaderPolicy = "leader";
+ public static final String greedyPolicy = "greedy";
+
private final IManager configManager;
public RouteBalancer(IManager configManager) {
@@ -46,8 +51,13 @@ public class RouteBalancer {
}
private IRouter genRouter() {
- // TODO: The Router should be configurable
- return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ String policy =
ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy();
+ if (policy.equals(leaderPolicy)) {
+ return new LeaderRouter(
+ getLoadManager().getAllLeadership(),
getLoadManager().getAllLoadScores());
+ } else {
+ return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ }
}
private LoadManager getLoadManager() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
new file mode 100644
index 0000000000..9cc8ef8212
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The LeaderRouter always pick the leader Replica */
+public class LeaderRouter implements IRouter {
+
+ // Map<RegionGroupId, leader location>
+ private final Map<TConsensusGroupId, Integer> leaderMap;
+ // Map<DataNodeId, loadScore>
+ private final Map<Integer, Float> loadScoreMap;
+
+ public LeaderRouter(Map<TConsensusGroupId, Integer> leaderMap, Map<Integer,
Float> loadScoreMap) {
+ this.leaderMap = leaderMap;
+ this.loadScoreMap = loadScoreMap;
+ }
+
+ @Override
+ public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+ List<TRegionReplicaSet> replicaSets) {
+ Map<TConsensusGroupId, TRegionReplicaSet> result = new
ConcurrentHashMap<>();
+
+ replicaSets.forEach(
+ replicaSet -> {
+ int leaderId = leaderMap.getOrDefault(replicaSet.getRegionId(), -1);
+ TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
+ sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+
+ /* 1. Pick leader if leader exists */
+ if (leaderId != -1) {
+ for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
+ if (dataNodeLocation.getDataNodeId() == leaderId) {
+ sortedReplicaSet.addToDataNodeLocations(dataNodeLocation);
+ }
+ }
+ }
+
+ /* 2. Sort replicaSets by loadScore and pick the rest */
+ // List<Pair<loadScore, TDataNodeLocation>> for sorting
+ List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
+ replicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ // The absenteeism of loadScoreMap means ConfigNode-leader
doesn't receive any
+ // heartbeat from that DataNode.
+ // In this case we put a maximum loadScore into the
sortList.
+ sortList.add(
+ new Pair<>(
+ (double)
+ loadScoreMap.computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty ->
Float.MAX_VALUE),
+ dataNodeLocation));
+ });
+ sortList.sort(Comparator.comparingDouble(Pair::getLeft));
+ for (Pair<Double, TDataNodeLocation> entry : sortList) {
+ if (entry.getRight().getDataNodeId() != leaderId) {
+ sortedReplicaSet.addToDataNodeLocations(entry.getRight());
+ }
+ }
+
+ result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
+ });
+
+ return result;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 3ea8803705..e02023fd24 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -25,6 +25,8 @@ import java.util.LinkedList;
/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
+ // TODO: This class might be split into DataNodeCache and ConfigNodeCache
+
// Cache heartbeat samples
private static final int maximumWindowSize = 100;
private final LinkedList<HeartbeatPackage> slidingWindow;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
new file mode 100644
index 0000000000..7283665dc5
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+public interface IRegionGroupCache {
+
+ /**
+ * Update RegionGroup's latest leader
+ *
+ * @param timestamp Judging timestamp
+ * @param dataNodeId Leader location
+ */
+ void updateLeader(long timestamp, int dataNodeId);
+
+ /**
+ * Get RegionGroup's latest leader
+ *
+ * @return The DataNodeId of latest leader
+ */
+ int getLeaderDataNodeId();
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
new file mode 100644
index 0000000000..6ec938ccbc
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RegionGroupCache implements IRegionGroupCache {
+
+ // TODO: This class might be split into SchemaRegionGroupCache and
DataRegionGroupCache
+
+ private long timestamp;
+
+ private final AtomicInteger leaderDataNodeId;
+
+ public RegionGroupCache() {
+ this.leaderDataNodeId = new AtomicInteger(-1);
+ }
+
+ @Override
+ public synchronized void updateLeader(long timestamp, int dataNodeId) {
+ if (timestamp > this.timestamp) {
+ this.timestamp = timestamp;
+ this.leaderDataNodeId.set(dataNodeId);
+ }
+ }
+
+ @Override
+ public int getLeaderDataNodeId() {
+ return leaderDataNodeId.get();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
index 6ba2645e4c..c6a12c5c79 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
@@ -101,7 +101,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.stream.Collectors;
public class DataNodeRPCServiceImpl implements IDataNodeRPCService.Iface {
@@ -109,7 +108,6 @@ public class DataNodeRPCServiceImpl implements
IDataNodeRPCService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeRPCServiceImpl.class);
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
- private static final double loadBalanceThreshold = 0.1;
public DataNodeRPCServiceImpl() {
super();
@@ -317,10 +315,17 @@ public class DataNodeRPCServiceImpl implements
IDataNodeRPCService.Iface {
@Override
public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws
TException {
- THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp(),
getJudgedLeaders());
- Random whetherToGetMetric = new Random();
+ THeartbeatResp resp = new THeartbeatResp();
+ resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
+
+ // Judging leader if necessary
+ if (req.isNeedJudgeLeader()) {
+ resp.setJudgedLeaders(getJudgedLeaders());
+ }
+
+ // Sampling load if necessary
if
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
- && whetherToGetMetric.nextDouble() < loadBalanceThreshold) {
+ && req.isNeedSamplingLoad()) {
long cpuLoad =
MetricsService.getInstance()
.getMetricManager()
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index f8da9cfc3b..8b3cf0c419 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -161,11 +161,13 @@ struct TInvalidatePermissionCacheReq {
struct THeartbeatReq {
1: required i64 heartbeatTimestamp
+ 2: required bool needJudgeLeader
+ 3: required bool needSamplingLoad
}
struct THeartbeatResp {
1: required i64 heartbeatTimestamp
- 2: required map<common.TConsensusGroupId, bool> judgedLeaders
+ 2: optional map<common.TConsensusGroupId, bool> judgedLeaders
3: optional i16 cpu
4: optional i16 memory
}