This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b52e79bb3 [IOTDB-5791] Construct Cluster-LoadPublisher-Thread and
IClusterStatusSubscriber (#9645)
8b52e79bb3 is described below
commit 8b52e79bb369dcafa68d0e9f93d96a822534bf80
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Apr 24 13:56:48 2023 +0800
[IOTDB-5791] Construct Cluster-LoadPublisher-Thread and
IClusterStatusSubscriber (#9645)
---
.../confignode/manager/ClusterSchemaManager.java | 15 ---
.../iotdb/confignode/manager/load/LoadManager.java | 8 +-
.../manager/load/balancer/RouteBalancer.java | 21 ++--
.../load/balancer/router/RegionRouteMap.java | 5 +
.../confignode/manager/load/cache/LoadCache.java | 15 ++-
.../load/cache/region/RegionGroupCache.java | 4 +
.../manager/load/service/StatisticsService.java | 115 ++++++++++++---------
.../subscriber/IClusterStatusSubscriber.java} | 17 ++-
.../manager/load/subscriber/RouteChangeEvent.java | 74 +++++++++++++
.../subscriber/StatisticsChangeEvent.java} | 26 +++--
10 files changed, 209 insertions(+), 91 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index bd5d3209f0..7f21222f62 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -60,7 +60,6 @@ import
org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoRe
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
@@ -80,8 +79,6 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -803,18 +800,6 @@ public class ClusterSchemaManager {
return RpcUtils.SUCCESS_STATUS;
}
- /**
- * When some Nodes' states changed during a heartbeat loop, the eventbus in
LoadManager will post
- * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
- *
- * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
- */
- @Subscribe
- @AllowConcurrentEvents
- public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
- // TODO
- }
-
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 2aaa730a8a..b6f635a11b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -72,8 +72,8 @@ public class LoadManager {
private final HeartbeatService heartbeatService;
private final StatisticsService statisticsService;
- private final EventBus eventBus =
- new AsyncEventBus("LoadManager-EventBus",
Executors.newFixedThreadPool(5));
+ private final EventBus loadPublisher =
+ new AsyncEventBus("Cluster-LoadPublisher-Thread",
Executors.newFixedThreadPool(5));
public LoadManager(IManager configManager) {
this.configManager = configManager;
@@ -85,9 +85,9 @@ public class LoadManager {
this.loadCache = new LoadCache();
this.heartbeatService = new HeartbeatService(configManager, loadCache);
this.statisticsService =
- new StatisticsService(configManager, routeBalancer, loadCache,
eventBus);
+ new StatisticsService(configManager, routeBalancer, loadCache,
loadPublisher);
- eventBus.register(configManager.getClusterSchemaManager());
+ loadPublisher.register(statisticsService);
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a87cb051ba..3599a442fa 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFl
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -57,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -165,16 +165,18 @@ public class RouteBalancer {
/**
* Invoking periodically to update the RegionRouteMap
*
- * @return True if the RegionRouteMap has changed, false otherwise
+ * @return RouteChangeEvent
*/
- public boolean updateRegionRouteMap() {
+ public RouteChangeEvent updateRegionRouteMap() {
synchronized (regionRouteMap) {
- return updateRegionLeaderMap() | updateRegionPriorityMap();
+ RegionRouteMap preRouteMap = new RegionRouteMap(regionRouteMap);
+ updateRegionLeaderMap();
+ updateRegionPriorityMap();
+ return new RouteChangeEvent(preRouteMap, regionRouteMap);
}
}
- private boolean updateRegionLeaderMap() {
- AtomicBoolean isLeaderChanged = new AtomicBoolean(false);
+ private void updateRegionLeaderMap() {
leaderCache.forEach(
(regionGroupId, leadershipSample) -> {
if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
@@ -186,13 +188,11 @@ public class RouteBalancer {
if (leadershipSample.getRight() !=
regionRouteMap.getLeader(regionGroupId)) {
// Update leader
regionRouteMap.setLeader(regionGroupId,
leadershipSample.getRight());
- isLeaderChanged.set(true);
}
});
- return isLeaderChanged.get();
}
- private boolean updateRegionPriorityMap() {
+ private void updateRegionPriorityMap() {
Map<TConsensusGroupId, Integer> regionLeaderMap =
regionRouteMap.getRegionLeaderMap();
Map<Integer, Long> dataNodeLoadScoreMap =
getLoadManager().getAllDataNodeLoadScores();
@@ -211,9 +211,6 @@ public class RouteBalancer {
if
(!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) {
regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap);
- return true;
- } else {
- return false;
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index 58f456ab8f..cbe9003355 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -50,6 +50,11 @@ public class RegionRouteMap {
this.regionPriorityMap = new ConcurrentHashMap<>();
}
+ public RegionRouteMap(RegionRouteMap other) {
+ this.regionLeaderMap = new ConcurrentHashMap<>(other.regionLeaderMap);
+ this.regionPriorityMap = new ConcurrentHashMap<>(other.regionPriorityMap);
+ }
+
/**
* @return DataNodeId where the specified RegionGroup's leader resides. And
return -1 if the
* leader is not recorded yet
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 0b2fd7195c..5a487de11c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -165,7 +165,7 @@ public class LoadCache {
if (nodeCache.periodicUpdate()) {
// Update and record the changed NodeStatistics
differentNodeStatisticsMap.put(
- nodeId, new Pair<>(nodeCache.getStatistics(),
preNodeStatistics));
+ nodeId, new Pair<>(preNodeStatistics,
nodeCache.getStatistics()));
}
});
return differentNodeStatisticsMap;
@@ -176,14 +176,19 @@ public class LoadCache {
*
* @return a map of changed RegionGroupStatistics
*/
- public Map<TConsensusGroupId, RegionGroupStatistics>
updateRegionGroupStatistics() {
- Map<TConsensusGroupId, RegionGroupStatistics>
differentRegionGroupStatisticsMap =
- new ConcurrentHashMap<>();
+ public Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ updateRegionGroupStatistics() {
+ Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+ differentRegionGroupStatisticsMap = new ConcurrentHashMap<>();
regionGroupCacheMap.forEach(
(regionGroupId, regionGroupCache) -> {
+ RegionGroupStatistics preRegionGroupStatistics =
+ regionGroupCache.getPreviousStatistics().deepCopy();
if (regionGroupCache.periodicUpdate()) {
// Update and record the changed RegionGroupStatistics
- differentRegionGroupStatisticsMap.put(regionGroupId,
regionGroupCache.getStatistics());
+ differentRegionGroupStatisticsMap.put(
+ regionGroupId,
+ new Pair<>(preRegionGroupStatistics,
regionGroupCache.getStatistics()));
}
});
return differentRegionGroupStatisticsMap;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index a531e440aa..dd21c24721 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -163,4 +163,8 @@ public class RegionGroupCache {
public RegionGroupStatistics getStatistics() {
return currentStatistics.get();
}
+
+ public RegionGroupStatistics getPreviousStatistics() {
+ return previousStatistics.get();
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
index 9e51b153de..bdbb24cc8e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -32,12 +32,13 @@ import
org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
+import
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -52,7 +53,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public class StatisticsService {
+public class StatisticsService implements IClusterStatusSubscriber {
private static final Logger LOGGER =
LoggerFactory.getLogger(StatisticsService.class);
@@ -111,28 +112,32 @@ public class StatisticsService {
boolean isNeedBroadcast = false;
// Update NodeStatistics:
- // Pair<NodeStatistics, NodeStatistics>:left one means the current
NodeStatistics, right one
- // means the previous NodeStatistics
+ // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap =
loadCache.updateNodeStatistics();
if (!differentNodeStatisticsMap.isEmpty()) {
isNeedBroadcast = true;
- recordNodeStatistics(differentNodeStatisticsMap);
- eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
}
// Update RegionGroupStatistics
- Map<TConsensusGroupId, RegionGroupStatistics>
differentRegionGroupStatisticsMap =
- loadCache.updateRegionGroupStatistics();
+ // Map<RegionGroupId, Pair<old RegionGroupStatistics, new
RegionGroupStatistics>>
+ Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+ differentRegionGroupStatisticsMap =
loadCache.updateRegionGroupStatistics();
if (!differentRegionGroupStatisticsMap.isEmpty()) {
isNeedBroadcast = true;
- recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+ }
+
+ if (isNeedBroadcast) {
+ StatisticsChangeEvent statisticsChangeEvent =
+ new StatisticsChangeEvent(differentNodeStatisticsMap,
differentRegionGroupStatisticsMap);
+ eventBus.post(statisticsChangeEvent);
}
// Update RegionRouteMap
- if (routeBalancer.updateRegionRouteMap()) {
+ RouteChangeEvent routeChangeEvent = routeBalancer.updateRegionRouteMap();
+ if (routeChangeEvent.isNeedBroadcast()) {
isNeedBroadcast = true;
- recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+ eventBus.post(routeChangeEvent);
}
if (isNeedBroadcast) {
@@ -140,6 +145,31 @@ public class StatisticsService {
}
}
+ public void broadcastLatestRegionRouteMap() {
+ Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+ routeBalancer.getLatestRegionPriorityMap();
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new
ConcurrentHashMap<>();
+ // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing,
NodeStatus.ReadOnly)
+ .forEach(
+ onlineDataNode ->
+ dataNodeLocationMap.put(
+ onlineDataNode.getLocation().getDataNodeId(),
onlineDataNode.getLocation()));
+
+ LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+ long broadcastTime = System.currentTimeMillis();
+
+ AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+ new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+ dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap
finished.");
+ }
+
private void recordNodeStatistics(
Map<Integer, Pair<NodeStatistics, NodeStatistics>>
differentNodeStatisticsMap) {
LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
@@ -148,19 +178,20 @@ public class StatisticsService {
LOGGER.info(
"[UpdateLoadStatistics]\t {}={}",
"nodeId{" + nodeCacheEntry.getKey() + "}",
- nodeCacheEntry.getValue().left);
+ nodeCacheEntry.getValue().getRight());
}
}
private void recordRegionGroupStatistics(
- Map<TConsensusGroupId, RegionGroupStatistics>
differentRegionGroupStatisticsMap) {
+ Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ differentRegionGroupStatisticsMap) {
LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
- for (Map.Entry<TConsensusGroupId, RegionGroupStatistics>
regionGroupStatisticsEntry :
- differentRegionGroupStatisticsMap.entrySet()) {
+ for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ regionGroupStatisticsEntry :
differentRegionGroupStatisticsMap.entrySet()) {
LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}",
regionGroupStatisticsEntry.getKey());
LOGGER.info("[UpdateLoadStatistics]\t {}",
regionGroupStatisticsEntry.getValue());
for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-
regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+
regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet())
{
LOGGER.info(
"[UpdateLoadStatistics]\t dataNodeId{}={}",
regionStatisticsEntry.getKey(),
@@ -169,50 +200,40 @@ public class StatisticsService {
}
}
- private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+ @Override
+ public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
+ recordNodeStatistics(event.getNodeStatisticsMap());
+ recordRegionGroupStatistics(event.getRegionGroupStatisticsMap());
+ }
+
+ private void recordRegionLeaderMap(Map<TConsensusGroupId, Pair<Integer,
Integer>> leaderMap) {
LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
- for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
- regionRouteMap.getRegionLeaderMap().entrySet()) {
+ for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>>
regionLeaderEntry :
+ leaderMap.entrySet()) {
LOGGER.info(
"[UpdateLoadStatistics]\t {}={}",
regionLeaderEntry.getKey(),
- regionLeaderEntry.getValue());
+ regionLeaderEntry.getValue().getRight());
}
+ }
+ private void recordRegionPriorityMap(
+ Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
priorityMap) {
LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
- for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
- regionRouteMap.getRegionPriorityMap().entrySet()) {
+ for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet,
TRegionReplicaSet>>
+ regionPriorityEntry : priorityMap.entrySet()) {
LOGGER.info(
"[UpdateLoadStatistics]\t {}={}",
regionPriorityEntry.getKey(),
- regionPriorityEntry.getValue().getDataNodeLocations().stream()
+
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toList()));
}
}
- public void broadcastLatestRegionRouteMap() {
- Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
- routeBalancer.getLatestRegionPriorityMap();
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new
ConcurrentHashMap<>();
- // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
- configManager
- .getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing,
NodeStatus.ReadOnly)
- .forEach(
- onlineDataNode ->
- dataNodeLocationMap.put(
- onlineDataNode.getLocation().getDataNodeId(),
onlineDataNode.getLocation()));
-
- LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
- long broadcastTime = System.currentTimeMillis();
-
- AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(
- DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
- new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
- dataNodeLocationMap);
-
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap
finished.");
+ @Override
+ public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
+ recordRegionLeaderMap(event.getLeaderMap());
+ recordRegionPriorityMap(event.getPriorityMap());
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
similarity index 66%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
index d9e8445e74..faa79fb5a5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
@@ -16,6 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.observer;
-public interface IEvent {}
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+
+public interface IClusterStatusSubscriber {
+
+ @Subscribe
+ @AllowConcurrentEvents
+ void onClusterStatisticsChanged(StatisticsChangeEvent event);
+
+ @Subscribe
+ @AllowConcurrentEvents
+ void onRegionGroupLeaderChanged(RouteChangeEvent event);
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
new file mode 100644
index 0000000000..55153f3bf5
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RouteChangeEvent {
+
+ // Map<RegionGroupId, Pair<old Leader, new Leader>>
+ private final Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap;
+ // Map<RegionGroupId, Pair<old Priority, new Priority>>
+ private final Map<TConsensusGroupId, Pair<TRegionReplicaSet,
TRegionReplicaSet>> priorityMap;
+
+ public RouteChangeEvent(RegionRouteMap preRouteMap, RegionRouteMap
currentRouteMap) {
+ this.leaderMap = new ConcurrentHashMap<>();
+ this.priorityMap = new ConcurrentHashMap<>();
+
+ preRouteMap
+ .getRegionLeaderMap()
+ .forEach(
+ (regionGroupId, oldLeader) -> {
+ Integer newLeader =
currentRouteMap.getRegionLeaderMap().get(regionGroupId);
+ if (newLeader != null && !newLeader.equals(oldLeader)) {
+ leaderMap.put(regionGroupId, new Pair<>(oldLeader, newLeader));
+ }
+ });
+
+ preRouteMap
+ .getRegionPriorityMap()
+ .forEach(
+ (regionGroupId, oldPriority) -> {
+ TRegionReplicaSet newPriority =
+ currentRouteMap.getRegionPriorityMap().get(regionGroupId);
+ if (newPriority != null && !newPriority.equals(oldPriority)) {
+ priorityMap.put(regionGroupId, new Pair<>(oldPriority,
newPriority));
+ }
+ });
+ }
+
+ public boolean isNeedBroadcast() {
+ return !leaderMap.isEmpty() || !priorityMap.isEmpty();
+ }
+
+ public Map<TConsensusGroupId, Pair<Integer, Integer>> getLeaderMap() {
+ return leaderMap;
+ }
+
+ public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
getPriorityMap() {
+ return priorityMap;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
similarity index 51%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
index 50e9023040..c8ba7b6248 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
@@ -16,24 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.observer;
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Map;
-public class NodeStatisticsEvent implements IEvent {
+public class StatisticsChangeEvent {
- // Pair<NodeStatistics, NodeStatistics>:left one means the current
NodeStatistics, right one means
- // the previous NodeStatistics
- private Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap;
+ // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
+ private final Map<Integer, Pair<NodeStatistics, NodeStatistics>>
nodeStatisticsMap;
+ // Map<RegionGroupId, Pair<old RegionGroupStatistics, new
RegionGroupStatistics>>
+ private final Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ regionGroupStatisticsMap;
- public NodeStatisticsEvent(Map<Integer, Pair<NodeStatistics,
NodeStatistics>> nodeStatisticsMap) {
+ public StatisticsChangeEvent(
+ Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap,
+ Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ regionGroupStatisticsMap) {
this.nodeStatisticsMap = nodeStatisticsMap;
+ this.regionGroupStatisticsMap = regionGroupStatisticsMap;
}
public Map<Integer, Pair<NodeStatistics, NodeStatistics>>
getNodeStatisticsMap() {
return nodeStatisticsMap;
}
+
+ public Map<TConsensusGroupId, Pair<RegionGroupStatistics,
RegionGroupStatistics>>
+ getRegionGroupStatisticsMap() {
+ return regionGroupStatisticsMap;
+ }
}