This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a9841c8d1a02c729e8e9b6c125f588b4d84138ca Author: YongzaoDan <[email protected]> AuthorDate: Wed Apr 12 22:53:17 2023 +0800 temporary relationship --- .../heartbeat/ConfigNodeHeartbeatHandler.java | 7 +- .../heartbeat/DataNodeHeartbeatHandler.java | 39 ++-- .../statemachine/ConfigRegionStateMachine.java | 9 +- .../iotdb/confignode/manager/ConfigManager.java | 2 +- .../iotdb/confignode/manager/ProcedureManager.java | 2 +- .../confignode/manager/RetryFailedTasksThread.java | 2 +- .../iotdb/confignode/manager/load/LoadCache.java | 186 +++++++++++++++++ .../iotdb/confignode/manager/load/LoadManager.java | 229 ++------------------- .../manager/load/balancer/RouteBalancer.java | 28 +++ .../load/heartbeat/HeartbeatSampleCache.java | 123 ----------- .../manager/load/heartbeat/HeartbeatService.java | 95 ++++----- .../load/heartbeat/region/RegionGroupCache.java | 2 +- .../load/statistics/RegionGroupStatistics.java | 9 +- .../manager/load/statistics/StatisticsService.java | 191 ++++++++++++++++- .../iotdb/confignode/manager/node/NodeManager.java | 31 --- .../manager/partition/PartitionManager.java | 12 -- .../procedure/env/ConfigNodeProcedureEnv.java | 4 +- .../statistics/RegionGroupStatisticsTest.java | 2 +- 18 files changed, 513 insertions(+), 460 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java index 92881210e7..86435e7755 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java @@ -18,8 +18,7 @@ */ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; -import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.LoadCache; import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.thrift.async.AsyncMethodCallback; @@ -27,9 +26,9 @@ import org.apache.thrift.async.AsyncMethodCallback; public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> { private final int nodeId; - private final HeartbeatSampleCache cache; + private final LoadCache cache; - public ConfigNodeHeartbeatHandler(int nodeId, HeartbeatSampleCache cache) { + public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) { this.nodeId = nodeId; this.cache = cache; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index bea4455313..83b0bc1c23 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -18,13 +18,10 @@ */ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.LoadCache; +import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.tsfile.utils.Pair; @@ -37,7 +34,8 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR private final int nodeId; - private final HeartbeatSampleCache heartbeatSampleCache; + private final LoadCache loadCache; + private final RouteBalancer routeBalancer; private final Map<Integer, Long> deviceNum; private final Map<Integer, Long> timeSeriesNum; @@ -45,13 +43,15 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR public DataNodeHeartbeatHandler( int nodeId, - HeartbeatSampleCache heartbeatSampleCache, + LoadCache loadCache, + RouteBalancer routeBalancer, Map<Integer, Long> deviceNum, Map<Integer, Long> timeSeriesNum, Map<Integer, Long> regionDisk) { this.nodeId = nodeId; - this.heartbeatSampleCache = heartbeatSampleCache; + this.loadCache = loadCache; + this.routeBalancer = routeBalancer; this.deviceNum = deviceNum; this.timeSeriesNum = timeSeriesNum; this.regionDisk = regionDisk; @@ -62,26 +62,27 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR long receiveTime = System.currentTimeMillis(); // Update NodeCache - heartbeatSampleCache.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime)); + loadCache.cacheDataNodeHeartbeatSample( + nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime)); heartbeatResp .getJudgedLeaders() .forEach( (regionGroupId, isLeader) -> { // Update RegionGroupCache - heartbeatSampleCache.cacheRegionHeartbeatSample(regionGroupId, nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - receiveTime, - // Region will inherit DataNode's status - RegionStatus.parse(heartbeatResp.getStatus()))); + loadCache.cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + receiveTime, + // Region will inherit DataNode's status + RegionStatus.parse(heartbeatResp.getStatus()))); if (isLeader) { // Update leaderCache - heartbeatSampleCache.cacheLeaderSample( - regionGroupId, - new Pair<>( - heartbeatResp.getHeartbeatTimestamp(), nodeId)); + routeBalancer.cacheLeaderSample( + regionGroupId, new Pair<>(heartbeatResp.getHeartbeatTimestamp(), nodeId)); } }); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 78282c4fa3..f6b3f25e14 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -204,13 +204,11 @@ public class ConfigRegionStateMachine newLeaderId, currentNodeTEndPoint); - // Always initiate all kinds of HeartbeatCache first - configManager.getLoadManager().initHeartbeatCache(); + // Always start load services first + configManager.getLoadManager().startLoadServices(); // Start leader scheduling services configManager.getProcedureManager().shiftExecutor(true); - configManager.getLoadManager().startHeartbeatService(); - configManager.getLoadManager().startLoadStatisticsService(); configManager.getLoadManager().getRouteBalancer().startRouteBalancingService(); configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); configManager.getPartitionManager().startRegionCleaner(); @@ -229,9 +227,8 @@ public class ConfigRegionStateMachine newLeaderId); // Stop leader scheduling services + configManager.getLoadManager().stopLoadServices(); configManager.getProcedureManager().shiftExecutor(false); - configManager.getLoadManager().stopHeartbeatService(); - configManager.getLoadManager().stopLoadStatisticsService(); configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService(); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); configManager.getPartitionManager().stopRegionCleaner(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index cf038e6dba..dab5101d1c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -85,10 +85,10 @@ import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachi import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.node.NodeMetrics; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.manager.pipe.PipeManager; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index f9ac0230fc..5df812d4fb 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -40,8 +40,8 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.ProcedureExecutor; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java index 3dab05219d..865b8b59c2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java @@ -28,8 +28,8 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; -import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java new file mode 100644 index 0000000000..03f002899c --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Maintain all kinds of heartbeat samples */ +public class LoadCache { + + private static final boolean IS_DATA_REGION_IOT_CONSENSUS = + ConsensusFactory.IOT_CONSENSUS.equals( + ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass()); + + // Map<NodeId, INodeCache> + private final Map<Integer, BaseNodeCache> nodeCacheMap; + // Map<RegionGroupId, RegionGroupCache> + private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; + + public LoadCache() { + this.nodeCacheMap = new ConcurrentHashMap<>(); + this.regionGroupCacheMap = new ConcurrentHashMap<>(); + } + + /** + * Cache the latest heartbeat sample of a ConfigNode. + * + * @param nodeId the id of the ConfigNode + * @param sample the latest heartbeat sample + */ + public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { + nodeCacheMap + .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId)) + .cacheHeartbeatSample(sample); + } + + /** + * Cache the latest heartbeat sample of a DataNode. + * + * @param nodeId the id of the DataNode + * @param sample the latest heartbeat sample + */ + public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { + nodeCacheMap + .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache()) + .cacheHeartbeatSample(sample); + } + + /** + * Cache the latest heartbeat sample of a RegionGroup. + * + * @param regionGroupId the id of the RegionGroup + * @param nodeId the id of the DataNode where specified Region resides + * @param sample the latest heartbeat sample + */ + public void cacheRegionHeartbeatSample( + TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) { + regionGroupCacheMap + .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId)) + .cacheHeartbeatSample(nodeId, sample); + } + + /** + * Periodic invoke to update the NodeStatistics of all Nodes. + * + * @return a map of changed NodeStatistics + */ + public Map<Integer, Pair<NodeStatistics, NodeStatistics>> updateNodeStatistics() { + Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap = + new ConcurrentHashMap<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy(); + if (nodeCache.periodicUpdate()) { + // Update and record the changed NodeStatistics + differentNodeStatisticsMap.put( + nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics)); + } + }); + return differentNodeStatisticsMap; + } + + /** + * Periodic invoke to update the RegionGroupStatistics of all RegionGroups. + * + * @return a map of changed RegionGroupStatistics + */ + public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() { + Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap = + new ConcurrentHashMap<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (regionGroupCache.periodicUpdate()) { + // Update and record the changed RegionGroupStatistics + differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics()); + } + }); + return differentRegionGroupStatisticsMap; + } + + public void initHeartbeatCache(IManager configManager) { + initNodeHeartbeatCache( + configManager.getNodeManager().getRegisteredConfigNodes(), + configManager.getNodeManager().getRegisteredDataNodes()); + initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets()); + } + + /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */ + private void initNodeHeartbeatCache( + List<TConfigNodeLocation> registeredConfigNodes, + List<TDataNodeConfiguration> registeredDataNodes) { + final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID; + nodeCacheMap.clear(); + + // Init ConfigNodeHeartbeatCache + registeredConfigNodes.forEach( + configNodeLocation -> { + if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) { + nodeCacheMap.put( + configNodeLocation.getConfigNodeId(), + new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())); + } + }); + // Force set itself and never update + nodeCacheMap.put( + ConfigNodeHeartbeatCache.CURRENT_NODE_ID, + new ConfigNodeHeartbeatCache( + CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS)); + + // Init DataNodeHeartbeatCache + registeredDataNodes.forEach( + dataNodeConfiguration -> + nodeCacheMap.put( + dataNodeConfiguration.getLocation().getDataNodeId(), new DataNodeHeartbeatCache())); + } + + /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */ + private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) { + regionGroupCacheMap.clear(); + regionReplicaSets.forEach( + regionReplicaSet -> + regionGroupCacheMap.put( + regionReplicaSet.getRegionId(), + new RegionGroupCache(regionReplicaSet.getRegionId()))); + } + + public void clearHeartbeatCache() { + nodeCacheMap.clear(); + regionGroupCacheMap.clear(); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 4b18d37a0a..15dc22c0f5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -21,39 +21,21 @@ package org.apache.iotdb.confignode.manager.load; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; -import org.apache.iotdb.confignode.client.DataNodeRequestType; -import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; -import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; -import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService; import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService; -import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; -import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; -import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; -import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -62,13 +44,8 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster @@ -87,16 +64,11 @@ public class LoadManager { private final RouteBalancer routeBalancer; /** Cluster load services */ + private final LoadCache loadCache; + private final HeartbeatService heartbeatService; private final StatisticsService statisticsService; - - /** Load statistics executor service */ - private final Object statisticsScheduleMonitor = new Object(); - private Future<?> currentLoadStatisticsFuture; - private final ScheduledExecutorService loadStatisticsExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service"); - private final EventBus eventBus = new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5)); @@ -107,8 +79,9 @@ public class LoadManager { this.partitionBalancer = new PartitionBalancer(configManager); this.routeBalancer = new RouteBalancer(configManager); - this.heartbeatService = new HeartbeatService(configManager); - this.statisticsService = new StatisticsService(configManager); + this.loadCache = new LoadCache(); + this.heartbeatService = new HeartbeatService(configManager, loadCache); + this.statisticsService = new StatisticsService(configManager, loadCache, eventBus); eventBus.register(configManager.getClusterSchemaManager()); eventBus.register(configManager.getSyncManager()); @@ -124,8 +97,8 @@ public class LoadManager { * @throws DatabaseNotExistsException If some specific StorageGroups don't exist */ public CreateRegionGroupsPlan allocateRegionGroups( - Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) - throws NotEnoughDataNodeException, DatabaseNotExistsException { + Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) + throws NotEnoughDataNodeException, DatabaseNotExistsException { return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType); } @@ -136,8 +109,8 @@ public class LoadManager { * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result */ public Map<String, SchemaPartitionTable> allocateSchemaPartition( - Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) - throws NoAvailableRegionGroupException { + Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) + throws NoAvailableRegionGroupException { return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); } @@ -148,8 +121,8 @@ public class LoadManager { * @return Map<StorageGroupName, DataPartitionTable>, the allocating result */ public Map<String, DataPartitionTable> allocateDataPartition( - Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) - throws NoAvailableRegionGroupException { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) + throws NoAvailableRegionGroupException { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } @@ -189,182 +162,20 @@ public class LoadManager { return routeBalancer.getLatestRegionPriorityMap(); } - /** Start the load statistics service */ - public void startLoadStatisticsService() { - synchronized (heartbeatScheduleMonitor) { - if (currentLoadStatisticsFuture == null) { - currentLoadStatisticsFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - loadStatisticsExecutor, - this::updateLoadStatistics, - 0, - HEARTBEAT_INTERVAL, - TimeUnit.MILLISECONDS); - LOGGER.info("LoadStatistics service is started successfully."); - } - } - } - - /** Stop the load statistics service */ - public void stopLoadStatisticsService() { - synchronized (heartbeatScheduleMonitor) { - if (currentLoadStatisticsFuture != null) { - currentLoadStatisticsFuture.cancel(false); - currentLoadStatisticsFuture = null; - LOGGER.info("LoadStatistics service is stopped successfully."); - } - } - } - - private void updateLoadStatistics() { - // Broadcast the RegionRouteMap if some LoadStatistics has changed - boolean isNeedBroadcast = false; - - // Update NodeStatistics: - // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one - // means the previous NodeStatistics - Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap = - new ConcurrentHashMap<>(); - getNodeManager() - .getNodeCacheMap() - .forEach( - (nodeId, nodeCache) -> { - NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy(); - if (nodeCache.periodicUpdate()) { - // Update and record the changed NodeStatistics - differentNodeStatisticsMap.put( - nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics)); - } - }); - if (!differentNodeStatisticsMap.isEmpty()) { - isNeedBroadcast = true; - recordNodeStatistics(differentNodeStatisticsMap); - eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap)); - } - - // Update RegionGroupStatistics - Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap = - new ConcurrentHashMap<>(); - getPartitionManager() - .getRegionGroupCacheMap() - .forEach( - (regionGroupId, regionGroupCache) -> { - if (regionGroupCache.periodicUpdate()) { - // Update and record the changed RegionGroupStatistics - differentRegionGroupStatisticsMap.put( - regionGroupId, regionGroupCache.getStatistics()); - } - }); - if (!differentRegionGroupStatisticsMap.isEmpty()) { - isNeedBroadcast = true; - recordRegionGroupStatistics(differentRegionGroupStatisticsMap); - } - - // Update RegionRouteMap - if (routeBalancer.updateRegionRouteMap()) { - isNeedBroadcast = true; - recordRegionRouteMap(routeBalancer.getRegionRouteMap()); - } - - if (isNeedBroadcast) { - broadcastLatestRegionRouteMap(); - } - } - - private void recordNodeStatistics( - Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) { - LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: "); - for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry : - differentNodeStatisticsMap.entrySet()) { - LOGGER.info( - "[UpdateLoadStatistics]\t {}={}", - "nodeId{" + nodeCacheEntry.getKey() + "}", - nodeCacheEntry.getValue().left); - } - } - - private void recordRegionGroupStatistics( - Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) { - LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: "); - for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry : - differentRegionGroupStatisticsMap.entrySet()) { - LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey()); - LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue()); - for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry : - regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) { - LOGGER.info( - "[UpdateLoadStatistics]\t dataNodeId{}={}", - regionStatisticsEntry.getKey(), - regionStatisticsEntry.getValue()); - } - } - } - - private void recordRegionRouteMap(RegionRouteMap regionRouteMap) { - LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: "); - for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry : - regionRouteMap.getRegionLeaderMap().entrySet()) { - LOGGER.info( - "[UpdateLoadStatistics]\t {}={}", - regionLeaderEntry.getKey(), - regionLeaderEntry.getValue()); - } - - LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: "); - for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry : - regionRouteMap.getRegionPriorityMap().entrySet()) { - LOGGER.info( - "[UpdateLoadStatistics]\t {}={}", - regionPriorityEntry.getKey(), - regionPriorityEntry.getValue().getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toList())); - } - } - - public void broadcastLatestRegionRouteMap() { - Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap(); - Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>(); - // Broadcast the RegionRouteMap to all DataNodes except the unknown ones - getNodeManager() - .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly) - .forEach( - onlineDataNode -> - dataNodeLocationMap.put( - onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation())); - - LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:"); - long broadcastTime = System.currentTimeMillis(); - - AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler = - new AsyncClientHandler<>( - DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, - new TRegionRouteReq(broadcastTime, latestRegionRouteMap), - dataNodeLocationMap); - AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); - LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished."); + public void startLoadServices() { + loadCache.initHeartbeatCache(configManager); + routeBalancer.initRegionRouteMap(); + heartbeatService.startHeartbeatService(); + statisticsService.startLoadStatisticsService(); } - /** Initialize all kinds of the HeartbeatCache when the ConfigNode-Leader is switched */ - public void initHeartbeatCache() { - getNodeManager().initNodeHeartbeatCache(); - getPartitionManager().initRegionGroupHeartbeatCache(); - routeBalancer.initRegionRouteMap(); + public void stopLoadServices() { + heartbeatService.stopHeartbeatService(); + statisticsService.stopLoadStatisticsService(); + loadCache.clearHeartbeatCache(); } public RouteBalancer getRouteBalancer() { return routeBalancer; } - - private NodeManager getNodeManager() { - return configManager.getNodeManager(); - } - - private PartitionManager getPartitionManager() { - return configManager.getPartitionManager(); - } - - public EventBus getEventBus() { - return eventBus; - } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 025d29e6fe..4a27368459 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -91,6 +91,12 @@ public class RouteBalancer { private static final boolean IS_DATA_REGION_IOT_CONSENSUS = ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS); + // Key: RegionGroupId + // Value: Pair<Timestamp, LeaderDataNodeId>, where + // the left value stands for sampling timestamp + // and the right value stands for the index of DataNode that leader resides. + private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache; + private final IManager configManager; /** RegionRouteMap */ @@ -111,6 +117,7 @@ public class RouteBalancer { public RouteBalancer(IManager configManager) { this.configManager = configManager; this.regionRouteMap = new RegionRouteMap(); + this.leaderCache = new ConcurrentHashMap<>(); switch (CONF.getLeaderDistributionPolicy()) { case ILeaderBalancer.GREEDY_POLICY: @@ -133,6 +140,27 @@ public class RouteBalancer { } } + /** + * Cache the latest leader of a RegionGroup. + * + * @param regionGroupId the id of the RegionGroup + * @param leaderSample the latest leader of a RegionGroup + */ + public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { + if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && IS_DATA_REGION_IOT_CONSENSUS) { + // The leadership of IoTConsensus protocol is decided by ConfigNode-leader + return; + } + + leaderCache.putIfAbsent(regionGroupId, leaderSample); + synchronized (leaderCache.get(regionGroupId)) { + if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) { + leaderCache.replace(regionGroupId, leaderSample); + } + } + } + /** * Invoking periodically to update the RegionRouteMap * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java deleted file mode 100644 index 4fcb27d00d..0000000000 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.heartbeat; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; -import org.apache.iotdb.consensus.ConsensusFactory; -import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; -import org.apache.iotdb.tsfile.utils.Pair; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** Maintain all kinds of heartbeat samples */ -public class HeartbeatSampleCache { - - private static final boolean IS_DATA_REGION_IOT_CONSENSUS = - ConsensusFactory.IOT_CONSENSUS.equals(ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass()); - - // Map<NodeId, INodeCache> - private final Map<Integer, BaseNodeCache> nodeCacheMap; - // Map<RegionGroupId, RegionGroupCache> - private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; - // Key: RegionGroupId - // Value: Pair<Timestamp, LeaderDataNodeId>, where - // the left value stands for sampling timestamp - // and the right value stands for the index of DataNode that leader resides. - private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache; - - public HeartbeatSampleCache() { - this.nodeCacheMap = new ConcurrentHashMap<>(); - this.regionGroupCacheMap = new ConcurrentHashMap<>(); - this.leaderCache = new ConcurrentHashMap<>(); - } - - /** - * Cache the latest heartbeat sample of a ConfigNode. - * - * @param nodeId the id of the ConfigNode - * @param sample the latest heartbeat sample - */ - public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { - nodeCacheMap.computeIfAbsent(nodeId, - empty -> new ConfigNodeHeartbeatCache(nodeId)) - .cacheHeartbeatSample(sample); - } - - /** - * Cache the latest heartbeat sample of a DataNode. - * - * @param nodeId the id of the DataNode - * @param sample the latest heartbeat sample - */ - public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { - nodeCacheMap.computeIfAbsent(nodeId, - empty -> new DataNodeHeartbeatCache()) - .cacheHeartbeatSample(sample); - } - - /** - * Cache the latest heartbeat sample of a RegionGroup. - * - * @param regionGroupId the id of the RegionGroup - * @param nodeId the id of the DataNode where specified Region resides - * @param sample the latest heartbeat sample - */ - public void cacheRegionHeartbeatSample(TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) { - regionGroupCacheMap.computeIfAbsent(regionGroupId, - empty -> new RegionGroupCache(regionGroupId)) - .cacheHeartbeatSample(nodeId, sample); - } - - /** - * Cache the latest leader of a RegionGroup. - * - * @param regionGroupId the id of the RegionGroup - * @param leaderSample the latest leader of a RegionGroup - */ - public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { - if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && IS_DATA_REGION_IOT_CONSENSUS) { - // The leadership of IoTConsensus protocol is decided by ConfigNode-leader - return; - } - - leaderCache.putIfAbsent(regionGroupId, leaderSample); - synchronized (leaderCache.get(regionGroupId)) { - if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) { - leaderCache.replace(regionGroupId, leaderSample); - } - } - } - - public void clear() { - nodeCacheMap.clear(); - regionGroupCacheMap.clear(); - leaderCache.clear(); - } -} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java index 056ecfd76c..b2aaaa15aa 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java @@ -30,10 +30,11 @@ import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeart import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.LoadCache; import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,23 +50,24 @@ public class HeartbeatService { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class); - private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); + private static final long HEARTBEAT_INTERVAL = + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); private final IManager configManager; + private final LoadCache loadCache; /** Heartbeat executor service */ // Monitor for leadership change private final Object heartbeatScheduleMonitor = new Object(); + private Future<?> currentHeartbeatFuture; private final ScheduledExecutorService heartBeatExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service"); + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service"); private final AtomicInteger heartbeatCounter = new AtomicInteger(0); - private final HeartbeatSampleCache heartbeatSampleCache; - - public HeartbeatService(IManager configManager) { + public HeartbeatService(IManager configManager, LoadCache loadCache) { this.configManager = configManager; - this.heartbeatSampleCache = new HeartbeatSampleCache(); + this.loadCache = loadCache; } /** Start the heartbeat service */ @@ -73,12 +75,12 @@ public class HeartbeatService { synchronized (heartbeatScheduleMonitor) { if (currentHeartbeatFuture == null) { currentHeartbeatFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - heartBeatExecutor, - this::heartbeatLoopBody, - 0, - HEARTBEAT_INTERVAL, - TimeUnit.MILLISECONDS); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + heartBeatExecutor, + this::heartbeatLoopBody, + 0, + HEARTBEAT_INTERVAL, + TimeUnit.MILLISECONDS); LOGGER.info("Heartbeat service is started successfully."); } } @@ -90,12 +92,29 @@ public class HeartbeatService { if (currentHeartbeatFuture != null) { currentHeartbeatFuture.cancel(false); currentHeartbeatFuture = null; - heartbeatSampleCache.clear(); LOGGER.info("Heartbeat service is stopped successfully."); } } } + /** loop body of the heartbeat thread */ + private void heartbeatLoopBody() { + // The consensusManager of configManager may not be fully initialized at this time + Optional.ofNullable(getConsensusManager()) + .ifPresent( + consensusManager -> { + if (getConsensusManager().isLeader()) { + // Generate HeartbeatReq + THeartbeatReq heartbeatReq = genHeartbeatReq(); + // Send heartbeat requests to all the registered ConfigNodes + pingRegisteredConfigNodes( + heartbeatReq, getNodeManager().getRegisteredConfigNodes()); + // Send heartbeat requests to all the registered DataNodes + pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes()); + } + }); + } + private THeartbeatReq genHeartbeatReq() { /* Generate heartbeat request */ THeartbeatReq heartbeatReq = new THeartbeatReq(); @@ -115,30 +134,13 @@ public class HeartbeatService { return heartbeatReq; } - /** loop body of the heartbeat thread */ - private void heartbeatLoopBody() { - // The consensusManager of configManager may not be fully initialized at this time - Optional.ofNullable(getConsensusManager()) - .ifPresent( - consensusManager -> { - if (getConsensusManager().isLeader()) { - // Generate HeartbeatReq - THeartbeatReq heartbeatReq = genHeartbeatReq(); - // Send heartbeat requests to all the registered ConfigNodes - pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes()); - // Send heartbeat requests to all the registered DataNodes - pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes()); - } - }); - } - /** * Send heartbeat requests to all the Registered ConfigNodes * * @param registeredConfigNodes ConfigNodes that registered in cluster */ private void pingRegisteredConfigNodes( - THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) { + THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) { // Send heartbeat requests for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) { if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { @@ -147,12 +149,12 @@ public class HeartbeatService { } ConfigNodeHeartbeatHandler handler = - new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), heartbeatSampleCache); + new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), loadCache); AsyncConfigNodeHeartbeatClientPool.getInstance() - .getConfigNodeHeartBeat( - configNodeLocation.getInternalEndPoint(), - heartbeatReq.getHeartbeatTimestamp(), - handler); + .getConfigNodeHeartBeat( + configNodeLocation.getInternalEndPoint(), + heartbeatReq.getHeartbeatTimestamp(), + handler); } } @@ -162,20 +164,20 @@ public class HeartbeatService { * @param registeredDataNodes DataNodes that registered in cluster */ private void pingRegisteredDataNodes( - THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) { + THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) { // Send heartbeat requests for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) { DataNodeHeartbeatHandler handler = - new DataNodeHeartbeatHandler( - dataNodeInfo.getLocation().getDataNodeId(), - heartbeatSampleCache, - configManager.getClusterQuotaManager().getDeviceNum(), - configManager.getClusterQuotaManager().getTimeSeriesNum(), - configManager.getClusterQuotaManager().getRegionDisk()); + new DataNodeHeartbeatHandler( + dataNodeInfo.getLocation().getDataNodeId(), + loadCache, + configManager.getClusterQuotaManager().getDeviceNum(), + configManager.getClusterQuotaManager().getTimeSeriesNum(), + configManager.getClusterQuotaManager().getRegionDisk()); configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); AsyncDataNodeHeartbeatClientPool.getInstance() - .getDataNodeHeartBeat( - dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + .getDataNodeHeartBeat( + dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); } } @@ -186,5 +188,4 @@ public class HeartbeatService { private NodeManager getNodeManager() { return configManager.getNodeManager(); } - } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java index fb5cf7766f..adcb93a906 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java @@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.manager.load.heartbeat.region; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; +import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import java.util.HashMap; import java.util.Map; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java index d22ea0e4d6..6a2f2db7ee 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.confignode.manager.load.statistics; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -32,8 +33,14 @@ import java.util.concurrent.ConcurrentHashMap; public class RegionGroupStatistics { - private RegionGroupStatus regionGroupStatus; + // The DataNodeId where the leader of current RegionGroup resides + private volatile int leaderId = -1; + // Indicate the routing priority of read/write requests for current RegionGroup. + // The replica with higher sorting order have higher priority. + // TODO: Might be split into readRouteMap and writeRouteMap in the future + private TRegionReplicaSet regionPriority; + private volatile RegionGroupStatus regionGroupStatus; private final Map<Integer, RegionStatistics> regionStatisticsMap; public RegionGroupStatistics() { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java index 79b8ad1869..20c37536f7 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java @@ -19,7 +19,196 @@ package org.apache.iotdb.confignode.manager.load.statistics; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.confignode.client.DataNodeRequestType; +import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; +import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.LoadCache; +import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; +import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; +import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; +import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; +import org.apache.iotdb.tsfile.utils.Pair; + +import com.google.common.eventbus.EventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + public class StatisticsService { - + private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class); + + private static final long HEARTBEAT_INTERVAL = + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); + + private final IManager configManager; + private final RouteBalancer routeBalancer; + private final LoadCache loadCache; + private final EventBus eventBus; + + public StatisticsService(IManager configManager, LoadCache loadCache, EventBus eventBus) { + this.configManager = configManager; + this.routeBalancer = configManager.getLoadManager().getRouteBalancer(); + this.loadCache = loadCache; + this.eventBus = eventBus; + } + + /** Load statistics executor service */ + private final Object statisticsScheduleMonitor = new Object(); + + private Future<?> currentLoadStatisticsFuture; + private final ScheduledExecutorService loadStatisticsExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service"); + + /** Start the load statistics service */ + public void startLoadStatisticsService() { + synchronized (statisticsScheduleMonitor) { + if (currentLoadStatisticsFuture == null) { + currentLoadStatisticsFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + loadStatisticsExecutor, + this::updateLoadStatistics, + 0, + HEARTBEAT_INTERVAL, + TimeUnit.MILLISECONDS); + LOGGER.info("LoadStatistics service is started successfully."); + } + } + } + + /** Stop the load statistics service */ + public void stopLoadStatisticsService() { + synchronized (statisticsScheduleMonitor) { + if (currentLoadStatisticsFuture != null) { + currentLoadStatisticsFuture.cancel(false); + currentLoadStatisticsFuture = null; + LOGGER.info("LoadStatistics service is stopped successfully."); + } + } + } + + private void updateLoadStatistics() { + // Broadcast the RegionRouteMap if some LoadStatistics has changed + boolean isNeedBroadcast = false; + + // Update NodeStatistics: + // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one + // means the previous NodeStatistics + Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap = + loadCache.updateNodeStatistics(); + if (!differentNodeStatisticsMap.isEmpty()) { + isNeedBroadcast = true; + recordNodeStatistics(differentNodeStatisticsMap); + eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap)); + } + + // Update RegionGroupStatistics + Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap = + loadCache.updateRegionGroupStatistics(); + if (!differentRegionGroupStatisticsMap.isEmpty()) { + isNeedBroadcast = true; + recordRegionGroupStatistics(differentRegionGroupStatisticsMap); + } + + // Update RegionRouteMap + if (routeBalancer.updateRegionRouteMap()) { + isNeedBroadcast = true; + recordRegionRouteMap(routeBalancer.getRegionRouteMap()); + } + + if (isNeedBroadcast) { + broadcastLatestRegionRouteMap(); + } + } + + private void recordNodeStatistics( + Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) { + LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: "); + for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry : + differentNodeStatisticsMap.entrySet()) { + LOGGER.info( + "[UpdateLoadStatistics]\t {}={}", + "nodeId{" + nodeCacheEntry.getKey() + "}", + nodeCacheEntry.getValue().left); + } + } + + private void recordRegionGroupStatistics( + Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) { + LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: "); + for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry : + differentRegionGroupStatisticsMap.entrySet()) { + LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey()); + LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue()); + for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry : + regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) { + LOGGER.info( + "[UpdateLoadStatistics]\t dataNodeId{}={}", + regionStatisticsEntry.getKey(), + regionStatisticsEntry.getValue()); + } + } + } + + private void recordRegionRouteMap(RegionRouteMap regionRouteMap) { + LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: "); + for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry : + regionRouteMap.getRegionLeaderMap().entrySet()) { + LOGGER.info( + "[UpdateLoadStatistics]\t {}={}", + regionLeaderEntry.getKey(), + regionLeaderEntry.getValue()); + } + + LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: "); + for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry : + regionRouteMap.getRegionPriorityMap().entrySet()) { + LOGGER.info( + "[UpdateLoadStatistics]\t {}={}", + regionPriorityEntry.getKey(), + regionPriorityEntry.getValue().getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList())); + } + } + + public void broadcastLatestRegionRouteMap() { + Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = + routeBalancer.getLatestRegionPriorityMap(); + Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>(); + // Broadcast the RegionRouteMap to all DataNodes except the unknown ones + configManager + .getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly) + .forEach( + onlineDataNode -> + dataNodeLocationMap.put( + onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation())); + + LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:"); + long broadcastTime = System.currentTimeMillis(); + + AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler = + new AsyncClientHandler<>( + DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, + new TRegionRouteReq(broadcastTime, latestRegionRouteMap), + dataNodeLocationMap); + AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); + LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished."); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 62ce597f2f..60e0134320 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -666,7 +666,6 @@ public class NodeManager { } } - public Map<Integer, BaseNodeCache> getNodeCacheMap() { return nodeCacheMap; } @@ -803,36 +802,6 @@ public class NodeManager { return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()); } - /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */ - public void initNodeHeartbeatCache() { - final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID; - nodeCacheMap.clear(); - - // Init ConfigNodeHeartbeatCache - getRegisteredConfigNodes() - .forEach( - configNodeLocation -> { - if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) { - nodeCacheMap.put( - configNodeLocation.getConfigNodeId(), - new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())); - } - }); - // Force set itself and never update - nodeCacheMap.put( - ConfigNodeHeartbeatCache.CURRENT_NODE_ID, - new ConfigNodeHeartbeatCache( - CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS)); - - // Init DataNodeHeartbeatCache - getRegisteredDataNodes() - .forEach( - dataNodeConfiguration -> - nodeCacheMap.put( - dataNodeConfiguration.getLocation().getDataNodeId(), - new DataNodeHeartbeatCache())); - } - private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index a754ccf396..fdec1487e6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -68,7 +68,6 @@ import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; @@ -1149,17 +1148,6 @@ public class PartitionManager { : RegionGroupStatus.Disabled; } - /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */ - public void initRegionGroupHeartbeatCache() { - regionGroupCacheMap.clear(); - getAllReplicaSets() - .forEach( - regionReplicaSet -> - regionGroupCacheMap.put( - regionReplicaSet.getRegionId(), - new RegionGroupCache(regionReplicaSet.getRegionId()))); - } - public void getSchemaRegionIds( List<String> databases, Map<String, List<Integer>> schemaRegionIds) { partitionInfo.getSchemaRegionIds(databases, schemaRegionIds); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 5faefbcd4f..06aef7c2f6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -48,11 +48,11 @@ import org.apache.iotdb.confignode.manager.ClusterSchemaManager; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java index b0b97b2b0c..dff0105f73 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java @@ -19,9 +19,9 @@ package org.apache.iotdb.confignode.persistence.partition.statistics; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; +import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.junit.Assert;
