This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58a2e633a6bec7af2fe41c8622e61c4be74b9131 Author: YongzaoDan <[email protected]> AuthorDate: Wed Apr 12 11:09:21 2023 +0800 stash for classes move --- .../heartbeat/ConfigNodeHeartbeatHandler.java | 18 +- .../heartbeat/DataNodeHeartbeatHandler.java | 58 +++---- .../statemachine/ConfigRegionStateMachine.java | 4 +- .../iotdb/confignode/manager/ConfigManager.java | 2 +- .../iotdb/confignode/manager/ProcedureManager.java | 2 +- .../confignode/manager/RetryFailedTasksThread.java | 4 +- .../iotdb/confignode/manager/load/LoadManager.java | 42 +++-- .../manager/load/balancer/RouteBalancer.java | 30 +--- .../load/heartbeat/HeartbeatSampleCache.java | 123 +++++++++++++ .../manager/load/heartbeat/HeartbeatService.java | 190 +++++++++++++++++++++ .../heartbeat/node}/BaseNodeCache.java | 3 +- .../heartbeat/node}/ConfigNodeHeartbeatCache.java | 3 +- .../heartbeat/node}/DataNodeHeartbeatCache.java | 3 +- .../heartbeat/node}/NodeHeartbeatSample.java | 2 +- .../heartbeat/region}/RegionCache.java | 7 +- .../heartbeat/region}/RegionGroupCache.java | 4 +- .../heartbeat/region}/RegionHeartbeatSample.java | 2 +- .../statistics}/NodeStatistics.java | 3 +- .../statistics}/RegionGroupStatistics.java | 2 +- .../statistics}/RegionStatistics.java | 3 +- .../statistics/StatisticsService.java} | 30 +--- .../iotdb/confignode/manager/node/NodeManager.java | 146 +--------------- .../manager/observer/NodeStatisticsEvent.java | 2 +- .../manager/partition/PartitionManager.java | 10 +- .../procedure/env/ConfigNodeProcedureEnv.java | 6 +- .../procedure/env/DataNodeRemoveHandler.java | 2 +- .../router/priority/GreedyPriorityTest.java | 6 +- .../priority/LeaderPriorityBalancerTest.java | 6 +- .../confignode/manager/node/NodeCacheTest.java | 4 +- .../manager/partition/RegionGroupCacheTest.java | 4 +- .../persistence/node/NodeStatisticsTest.java | 2 +- .../statistics/RegionGroupStatisticsTest.java | 4 +- .../partition/statistics/RegionStatisticsTest.java | 2 +- 33 files changed, 427 insertions(+), 302 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java index c97f3d8bb4..92881210e7 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java @@ -18,24 +18,26 @@ */ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; -import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.thrift.async.AsyncMethodCallback; public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> { - // Update ConfigNodeHeartbeatCache when success - private final ConfigNodeHeartbeatCache configNodeHeartbeatCache; + private final int nodeId; + private final HeartbeatSampleCache cache; - public ConfigNodeHeartbeatHandler(ConfigNodeHeartbeatCache configNodeHeartbeatCache) { - this.configNodeHeartbeatCache = configNodeHeartbeatCache; + public ConfigNodeHeartbeatHandler(int nodeId, HeartbeatSampleCache cache) { + this.nodeId = nodeId; + this.cache = cache; } @Override public void onComplete(Long timestamp) { - configNodeHeartbeatCache.cacheHeartbeatSample( - new NodeHeartbeatSample(timestamp, System.currentTimeMillis())); + long receiveTime = System.currentTimeMillis(); + cache.cacheConfigNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(timestamp, receiveTime)); } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index fe1af63540..bea4455313 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -21,11 +21,11 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; -import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.tsfile.utils.Pair; @@ -35,27 +35,23 @@ import java.util.Map; public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> { - // Update DataNodeHeartbeatCache when success - private final TDataNodeLocation dataNodeLocation; - private final DataNodeHeartbeatCache dataNodeHeartbeatCache; - private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; - private final RouteBalancer routeBalancer; + private final int nodeId; + + private final HeartbeatSampleCache heartbeatSampleCache; + private final Map<Integer, Long> deviceNum; private final Map<Integer, Long> timeSeriesNum; private final Map<Integer, Long> regionDisk; public DataNodeHeartbeatHandler( - TDataNodeLocation dataNodeLocation, - DataNodeHeartbeatCache dataNodeHeartbeatCache, - Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap, - RouteBalancer routeBalancer, + int nodeId, + HeartbeatSampleCache heartbeatSampleCache, Map<Integer, Long> deviceNum, Map<Integer, Long> timeSeriesNum, Map<Integer, Long> regionDisk) { - this.dataNodeLocation = dataNodeLocation; - this.dataNodeHeartbeatCache = dataNodeHeartbeatCache; - this.regionGroupCacheMap = regionGroupCacheMap; - this.routeBalancer = routeBalancer; + + this.nodeId = nodeId; + this.heartbeatSampleCache = heartbeatSampleCache; this.deviceNum = deviceNum; this.timeSeriesNum = timeSeriesNum; this.regionDisk = regionDisk; @@ -66,31 +62,29 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR long receiveTime = System.currentTimeMillis(); // Update NodeCache - dataNodeHeartbeatCache.cacheHeartbeatSample( - new NodeHeartbeatSample(heartbeatResp, receiveTime)); + heartbeatSampleCache.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime)); - // Update RegionGroupCache And leaderCache heartbeatResp .getJudgedLeaders() .forEach( (regionGroupId, isLeader) -> { - regionGroupCacheMap - .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId)) - .cacheHeartbeatSample( - dataNodeLocation.getDataNodeId(), - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - receiveTime, - // Region will inherit DataNode's status - RegionStatus.parse(heartbeatResp.getStatus()))); + // Update RegionGroupCache + heartbeatSampleCache.cacheRegionHeartbeatSample(regionGroupId, nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + receiveTime, + // Region will inherit DataNode's status + RegionStatus.parse(heartbeatResp.getStatus()))); if (isLeader) { - routeBalancer.cacheLeaderSample( + // Update leaderCache + heartbeatSampleCache.cacheLeaderSample( regionGroupId, new Pair<>( - heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId())); + heartbeatResp.getHeartbeatTimestamp(), nodeId)); } }); + if (heartbeatResp.getDeviceNum() != null) { deviceNum.putAll(heartbeatResp.getDeviceNum()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 9e93061cf3..78282c4fa3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -209,10 +209,10 @@ public class ConfigRegionStateMachine // Start leader scheduling services configManager.getProcedureManager().shiftExecutor(true); + configManager.getLoadManager().startHeartbeatService(); configManager.getLoadManager().startLoadStatisticsService(); configManager.getLoadManager().getRouteBalancer().startRouteBalancingService(); configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); - configManager.getNodeManager().startHeartbeatService(); configManager.getPartitionManager().startRegionCleaner(); // we do cq recovery async for two reasons: @@ -230,10 +230,10 @@ public class ConfigRegionStateMachine // Stop leader scheduling services configManager.getProcedureManager().shiftExecutor(false); + configManager.getLoadManager().stopHeartbeatService(); configManager.getLoadManager().stopLoadStatisticsService(); configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService(); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); - configManager.getNodeManager().stopHeartbeatService(); configManager.getPartitionManager().stopRegionCleaner(); configManager.getCQManager().stopCQScheduler(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 96ea55f134..cf038e6dba 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -88,7 +88,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.node.NodeMetrics; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.manager.pipe.PipeManager; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 63a78d36c1..f9ac0230fc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -41,7 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.ProcedureExecutor; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java index 97666a005b..3dab05219d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java @@ -29,7 +29,7 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -55,6 +55,8 @@ import java.util.concurrent.TimeUnit; */ public class RetryFailedTasksThread { + // TODO: Replace this class by cluster events + private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index a19c942d25..4b18d37a0a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.manager.load; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; @@ -32,23 +33,24 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; +import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService; +import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService; import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.tsfile.utils.Pair; @@ -76,9 +78,6 @@ public class LoadManager { private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class); - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs(); - private final IManager configManager; /** Balancers */ @@ -87,12 +86,16 @@ public class LoadManager { private final PartitionBalancer partitionBalancer; private final RouteBalancer routeBalancer; + /** Cluster load services */ + private final HeartbeatService heartbeatService; + private final StatisticsService statisticsService; + + /** Load statistics executor service */ + private final Object statisticsScheduleMonitor = new Object(); private Future<?> currentLoadStatisticsFuture; - private final ScheduledExecutorService loadStatisticsExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service"); - private final Object scheduleMonitor = new Object(); private final EventBus eventBus = new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5)); @@ -104,6 +107,9 @@ public class LoadManager { this.partitionBalancer = new PartitionBalancer(configManager); this.routeBalancer = new RouteBalancer(configManager); + this.heartbeatService = new HeartbeatService(configManager); + this.statisticsService = new StatisticsService(configManager); + eventBus.register(configManager.getClusterSchemaManager()); eventBus.register(configManager.getSyncManager()); } @@ -118,8 +124,8 @@ public class LoadManager { * @throws DatabaseNotExistsException If some specific StorageGroups don't exist */ public CreateRegionGroupsPlan allocateRegionGroups( - Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) - throws NotEnoughDataNodeException, DatabaseNotExistsException { + Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) + throws NotEnoughDataNodeException, DatabaseNotExistsException { return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType); } @@ -130,8 +136,8 @@ public class LoadManager { * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result */ public Map<String, SchemaPartitionTable> allocateSchemaPartition( - Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) - throws NoAvailableRegionGroupException { + Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) + throws NoAvailableRegionGroupException { return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); } @@ -142,8 +148,8 @@ public class LoadManager { * @return Map<StorageGroupName, DataPartitionTable>, the allocating result */ public Map<String, DataPartitionTable> allocateDataPartition( - Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) - throws NoAvailableRegionGroupException { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) + throws NoAvailableRegionGroupException { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } @@ -185,7 +191,7 @@ public class LoadManager { /** Start the load statistics service */ public void startLoadStatisticsService() { - synchronized (scheduleMonitor) { + synchronized (heartbeatScheduleMonitor) { if (currentLoadStatisticsFuture == null) { currentLoadStatisticsFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay( @@ -201,7 +207,7 @@ public class LoadManager { /** Stop the load statistics service */ public void stopLoadStatisticsService() { - synchronized (scheduleMonitor) { + synchronized (heartbeatScheduleMonitor) { if (currentLoadStatisticsFuture != null) { currentLoadStatisticsFuture.cancel(false); currentLoadStatisticsFuture = null; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 31f28397fb..025d29e6fe 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -93,12 +93,6 @@ public class RouteBalancer { private final IManager configManager; - // Key: RegionGroupId - // Value: Pair<Timestamp, LeaderDataNodeId>, where - // the left value stands for sampling timestamp - // and the right value stands for the index of DataNode that leader resides. - private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache; - /** RegionRouteMap */ private final RegionRouteMap regionRouteMap; // For generating optimal RegionLeaderMap @@ -107,6 +101,7 @@ public class RouteBalancer { private final IPriorityBalancer priorityRouter; /** Leader Balancing service */ + // TODO: leader balancing should be triggered by cluster events private Future<?> currentLeaderBalancingFuture; private final ScheduledExecutorService leaderBalancingExecutor = @@ -115,8 +110,6 @@ public class RouteBalancer { public RouteBalancer(IManager configManager) { this.configManager = configManager; - - this.leaderCache = new ConcurrentHashMap<>(); this.regionRouteMap = new RegionRouteMap(); switch (CONF.getLeaderDistributionPolicy()) { @@ -140,27 +133,6 @@ public class RouteBalancer { } } - /** - * Cache the newest leaderHeartbeatSample - * - * @param regionGroupId Corresponding RegionGroup's index - * @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample - */ - public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { - if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && IS_DATA_REGION_IOT_CONSENSUS) { - // The leadership of IoTConsensus protocol is decided by ConfigNode-leader - return; - } - - leaderCache.putIfAbsent(regionGroupId, leaderSample); - synchronized (leaderCache.get(regionGroupId)) { - if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) { - leaderCache.replace(regionGroupId, leaderSample); - } - } - } - /** * Invoking periodically to update the RegionRouteMap * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java new file mode 100644 index 0000000000..4fcb27d00d --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.heartbeat; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Maintain all kinds of heartbeat samples */ +public class HeartbeatSampleCache { + + private static final boolean IS_DATA_REGION_IOT_CONSENSUS = + ConsensusFactory.IOT_CONSENSUS.equals(ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass()); + + // Map<NodeId, INodeCache> + private final Map<Integer, BaseNodeCache> nodeCacheMap; + // Map<RegionGroupId, RegionGroupCache> + private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; + // Key: RegionGroupId + // Value: Pair<Timestamp, LeaderDataNodeId>, where + // the left value stands for sampling timestamp + // and the right value stands for the index of DataNode that leader resides. + private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache; + + public HeartbeatSampleCache() { + this.nodeCacheMap = new ConcurrentHashMap<>(); + this.regionGroupCacheMap = new ConcurrentHashMap<>(); + this.leaderCache = new ConcurrentHashMap<>(); + } + + /** + * Cache the latest heartbeat sample of a ConfigNode. + * + * @param nodeId the id of the ConfigNode + * @param sample the latest heartbeat sample + */ + public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { + nodeCacheMap.computeIfAbsent(nodeId, + empty -> new ConfigNodeHeartbeatCache(nodeId)) + .cacheHeartbeatSample(sample); + } + + /** + * Cache the latest heartbeat sample of a DataNode. + * + * @param nodeId the id of the DataNode + * @param sample the latest heartbeat sample + */ + public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { + nodeCacheMap.computeIfAbsent(nodeId, + empty -> new DataNodeHeartbeatCache()) + .cacheHeartbeatSample(sample); + } + + /** + * Cache the latest heartbeat sample of a RegionGroup. + * + * @param regionGroupId the id of the RegionGroup + * @param nodeId the id of the DataNode where specified Region resides + * @param sample the latest heartbeat sample + */ + public void cacheRegionHeartbeatSample(TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) { + regionGroupCacheMap.computeIfAbsent(regionGroupId, + empty -> new RegionGroupCache(regionGroupId)) + .cacheHeartbeatSample(nodeId, sample); + } + + /** + * Cache the latest leader of a RegionGroup. + * + * @param regionGroupId the id of the RegionGroup + * @param leaderSample the latest leader of a RegionGroup + */ + public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { + if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && IS_DATA_REGION_IOT_CONSENSUS) { + // The leadership of IoTConsensus protocol is decided by ConfigNode-leader + return; + } + + leaderCache.putIfAbsent(regionGroupId, leaderSample); + synchronized (leaderCache.get(regionGroupId)) { + if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) { + leaderCache.replace(regionGroupId, leaderSample); + } + } + } + + public void clear() { + nodeCacheMap.clear(); + regionGroupCacheMap.clear(); + leaderCache.clear(); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java new file mode 100644 index 0000000000..056ecfd76c --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.heartbeat; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; +import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool; +import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler; +import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Maintain the Cluster-Heartbeat-Service */ +public class HeartbeatService { + + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class); + + private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); + + private final IManager configManager; + + /** Heartbeat executor service */ + // Monitor for leadership change + private final Object heartbeatScheduleMonitor = new Object(); + private Future<?> currentHeartbeatFuture; + private final ScheduledExecutorService heartBeatExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service"); + private final AtomicInteger heartbeatCounter = new AtomicInteger(0); + + private final HeartbeatSampleCache heartbeatSampleCache; + + public HeartbeatService(IManager configManager) { + this.configManager = configManager; + this.heartbeatSampleCache = new HeartbeatSampleCache(); + } + + /** Start the heartbeat service */ + public void startHeartbeatService() { + synchronized (heartbeatScheduleMonitor) { + if (currentHeartbeatFuture == null) { + currentHeartbeatFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + heartBeatExecutor, + this::heartbeatLoopBody, + 0, + HEARTBEAT_INTERVAL, + TimeUnit.MILLISECONDS); + LOGGER.info("Heartbeat service is started successfully."); + } + } + } + + /** Stop the heartbeat service */ + public void stopHeartbeatService() { + synchronized (heartbeatScheduleMonitor) { + if (currentHeartbeatFuture != null) { + currentHeartbeatFuture.cancel(false); + currentHeartbeatFuture = null; + heartbeatSampleCache.clear(); + LOGGER.info("Heartbeat service is stopped successfully."); + } + } + } + + private THeartbeatReq genHeartbeatReq() { + /* Generate heartbeat request */ + THeartbeatReq heartbeatReq = new THeartbeatReq(); + heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis()); + // Always sample RegionGroups' leadership as the Region heartbeat + heartbeatReq.setNeedJudgeLeader(true); + // We sample DataNode's load in every 10 heartbeat loop + heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); + + /* Update heartbeat counter */ + heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10); + if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) { + heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds()); + heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds()); + heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); + } + return heartbeatReq; + } + + /** loop body of the heartbeat thread */ + private void heartbeatLoopBody() { + // The consensusManager of configManager may not be fully initialized at this time + Optional.ofNullable(getConsensusManager()) + .ifPresent( + consensusManager -> { + if (getConsensusManager().isLeader()) { + // Generate HeartbeatReq + THeartbeatReq heartbeatReq = genHeartbeatReq(); + // Send heartbeat requests to all the registered ConfigNodes + pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes()); + // Send heartbeat requests to all the registered DataNodes + pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes()); + } + }); + } + + /** + * Send heartbeat requests to all the Registered ConfigNodes + * + * @param registeredConfigNodes ConfigNodes that registered in cluster + */ + private void pingRegisteredConfigNodes( + THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) { + // Send heartbeat requests + for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) { + if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + // Skip itself + continue; + } + + ConfigNodeHeartbeatHandler handler = + new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), heartbeatSampleCache); + AsyncConfigNodeHeartbeatClientPool.getInstance() + .getConfigNodeHeartBeat( + configNodeLocation.getInternalEndPoint(), + heartbeatReq.getHeartbeatTimestamp(), + handler); + } + } + + /** + * Send heartbeat requests to all the Registered DataNodes + * + * @param registeredDataNodes DataNodes that registered in cluster + */ + private void pingRegisteredDataNodes( + THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) { + // Send heartbeat requests + for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) { + DataNodeHeartbeatHandler handler = + new DataNodeHeartbeatHandler( + dataNodeInfo.getLocation().getDataNodeId(), + heartbeatSampleCache, + configManager.getClusterQuotaManager().getDeviceNum(), + configManager.getClusterQuotaManager().getTimeSeriesNum(), + configManager.getClusterQuotaManager().getRegionDisk()); + configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); + AsyncDataNodeHeartbeatClientPool.getInstance() + .getDataNodeHeartBeat( + dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + } + } + + private ConsensusManager getConsensusManager() { + return configManager.getConsensusManager(); + } + + private NodeManager getNodeManager() { + return configManager.getNodeManager(); + } + +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java similarity index 97% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java index 24f9a9edb5..392ae24b6e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.node.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import java.util.LinkedList; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java similarity index 95% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java index 17fc9b1eb2..afd9d3e197 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.node.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.node; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; public class ConfigNodeHeartbeatCache extends BaseNodeCache { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java similarity index 95% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java index 5754d27320..cb20836120 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.node.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import org.apache.iotdb.mpp.rpc.thrift.TLoadSample; /** DataNodeHeartbeatCache caches and maintains all the heartbeat data */ diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java similarity index 97% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java index dceff727bc..b3857c11f8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.node.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.node; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java similarity index 86% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java index 706b40e3e8..6aff8394e9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java @@ -16,16 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.region; import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME; -import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE; +import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME; +import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE; public class RegionCache { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java similarity index 96% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java index 7e490670a9..fb5cf7766f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.region; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; +import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import java.util.HashMap; import java.util.Map; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java similarity index 95% copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java index 8de58a5e93..88e94e4be2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; +package org.apache.iotdb.confignode.manager.load.heartbeat.region; import org.apache.iotdb.commons.cluster.RegionStatus; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java similarity index 96% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java index 627ac00a33..a0a8645209 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.node.heartbeat; +package org.apache.iotdb.confignode.manager.load.statistics; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java similarity index 98% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java index d36175bc67..d22ea0e4d6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; +package org.apache.iotdb.confignode.manager.load.statistics; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java similarity index 94% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java index d30bf43d96..87bdf39f91 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; +package org.apache.iotdb.confignode.manager.load.statistics; import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java similarity index 53% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java index 8de58a5e93..79b8ad1869 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java @@ -16,34 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.partition.heartbeat; -import org.apache.iotdb.commons.cluster.RegionStatus; +package org.apache.iotdb.confignode.manager.load.statistics; -public class RegionHeartbeatSample { +public class StatisticsService { - // Unit: ms - private final long sendTimestamp; - private final long receiveTimestamp; - private final RegionStatus status; - - // TODO: Add load sample - - public RegionHeartbeatSample(long sendTimestamp, long receiveTimestamp, RegionStatus status) { - this.sendTimestamp = sendTimestamp; - this.receiveTimestamp = receiveTimestamp; - this.status = status; - } - - public long getSendTimestamp() { - return sendTimestamp; - } - - public long getReceiveTimestamp() { - return receiveTimestamp; - } - - public RegionStatus getStatus() { - return status; - } + } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 97125db05a..62ce597f2f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -27,18 +27,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.RegionRoleType; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.confignode.client.DataNodeRequestType; -import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; -import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; -import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler; -import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; @@ -60,9 +54,9 @@ import org.apache.iotdb.confignode.manager.TriggerManager; import org.apache.iotdb.confignode.manager.UDFManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.manager.pipe.PipeManager; @@ -81,7 +75,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -98,9 +91,6 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -119,23 +109,12 @@ public class NodeManager { private final ReentrantLock removeConfigNodeLock; - /** Heartbeat executor service */ - // Monitor for leadership change - private final Object scheduleMonitor = new Object(); - // Map<NodeId, INodeCache> - private final Map<Integer, BaseNodeCache> nodeCacheMap; - private final AtomicInteger heartbeatCounter = new AtomicInteger(0); - private Future<?> currentHeartbeatFuture; - private final ScheduledExecutorService heartBeatExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service"); - private final Random random; public NodeManager(IManager configManager, NodeInfo nodeInfo) { this.configManager = configManager; this.nodeInfo = nodeInfo; this.removeConfigNodeLock = new ReentrantLock(); - this.nodeCacheMap = new ConcurrentHashMap<>(); this.random = new Random(System.currentTimeMillis()); } @@ -687,125 +666,6 @@ public class NodeManager { } } - /** Start the heartbeat service */ - public void startHeartbeatService() { - synchronized (scheduleMonitor) { - if (currentHeartbeatFuture == null) { - currentHeartbeatFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - heartBeatExecutor, - this::heartbeatLoopBody, - 0, - HEARTBEAT_INTERVAL, - TimeUnit.MILLISECONDS); - LOGGER.info("Heartbeat service is started successfully."); - } - } - } - - /** loop body of the heartbeat thread */ - private void heartbeatLoopBody() { - // The consensusManager of configManager may not be fully initialized at this time - Optional.ofNullable(getConsensusManager()) - .ifPresent( - consensusManager -> { - if (getConsensusManager().isLeader()) { - // Generate HeartbeatReq - THeartbeatReq heartbeatReq = genHeartbeatReq(); - // Send heartbeat requests to all the registered DataNodes - pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes()); - // Send heartbeat requests to all the registered ConfigNodes - pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes()); - } - }); - } - - private THeartbeatReq genHeartbeatReq() { - /* Generate heartbeat request */ - THeartbeatReq heartbeatReq = new THeartbeatReq(); - heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis()); - // Always sample RegionGroups' leadership as the Region heartbeat - heartbeatReq.setNeedJudgeLeader(true); - // We sample DataNode's load in every 10 heartbeat loop - heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0); - - /* Update heartbeat counter */ - heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10); - if (!getClusterQuotaManager().hasSpaceQuotaLimit()) { - heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds()); - heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds()); - heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage()); - } - return heartbeatReq; - } - - /** - * Send heartbeat requests to all the Registered DataNodes - * - * @param registeredDataNodes DataNodes that registered in cluster - */ - private void pingRegisteredDataNodes( - THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) { - // Send heartbeat requests - for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) { - DataNodeHeartbeatHandler handler = - new DataNodeHeartbeatHandler( - dataNodeInfo.getLocation(), - (DataNodeHeartbeatCache) - nodeCacheMap.computeIfAbsent( - dataNodeInfo.getLocation().getDataNodeId(), - empty -> new DataNodeHeartbeatCache()), - getPartitionManager().getRegionGroupCacheMap(), - getLoadManager().getRouteBalancer(), - getClusterQuotaManager().getDeviceNum(), - getClusterQuotaManager().getTimeSeriesNum(), - getClusterQuotaManager().getRegionDisk()); - getClusterQuotaManager().updateSpaceQuotaUsage(); - AsyncDataNodeHeartbeatClientPool.getInstance() - .getDataNodeHeartBeat( - dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); - } - } - - /** - * Send heartbeat requests to all the Registered ConfigNodes - * - * @param registeredConfigNodes ConfigNodes that registered in cluster - */ - private void pingRegisteredConfigNodes( - THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) { - // Send heartbeat requests - for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) { - if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { - // Skip itself - continue; - } - - ConfigNodeHeartbeatHandler handler = - new ConfigNodeHeartbeatHandler( - (ConfigNodeHeartbeatCache) - nodeCacheMap.computeIfAbsent( - configNodeLocation.getConfigNodeId(), - empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()))); - AsyncConfigNodeHeartbeatClientPool.getInstance() - .getConfigNodeHeartBeat( - configNodeLocation.getInternalEndPoint(), - heartbeatReq.getHeartbeatTimestamp(), - handler); - } - } - - /** Stop the heartbeat service */ - public void stopHeartbeatService() { - synchronized (scheduleMonitor) { - if (currentHeartbeatFuture != null) { - currentHeartbeatFuture.cancel(false); - currentHeartbeatFuture = null; - nodeCacheMap.clear(); - LOGGER.info("Heartbeat service is stopped successfully."); - } - } - } public Map<Integer, BaseNodeCache> getNodeCacheMap() { return nodeCacheMap; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java index a38adafd74..4c44fb07c1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.confignode.manager.observer; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import org.apache.iotdb.tsfile.utils.Pair; import java.util.Map; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 223bee15e4..a754ccf396 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -68,7 +68,7 @@ import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; @@ -128,15 +128,11 @@ public class PartitionManager { private final ScheduledExecutorService regionMaintainer; private Future<?> currentRegionMaintainerFuture; - // Map<RegionId, RegionGroupCache> - private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; - public PartitionManager(IManager configManager, PartitionInfo partitionInfo) { this.configManager = configManager; this.partitionInfo = partitionInfo; this.regionMaintainer = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer"); - this.regionGroupCacheMap = new ConcurrentHashMap<>(); setSeriesPartitionExecutor(); } @@ -1070,10 +1066,6 @@ public class PartitionManager { } } - public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() { - return regionGroupCacheMap; - } - public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { regionGroupCacheMap.remove(consensusGroupId); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index efa53d330d..5faefbcd4f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -49,10 +49,10 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index afa2d3c543..bf1ce7e940 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java index cb617bf027..6da0304f44 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java @@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.junit.Assert; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java index a028219b08..f1bf444427 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java @@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.junit.Assert; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java index 4d93b6355b..b51cf10bcc 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java @@ -19,8 +19,8 @@ package org.apache.iotdb.confignode.manager.node; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.junit.Assert; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java index 3c3cf4881b..da91dc82dc 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java @@ -21,8 +21,8 @@ package org.apache.iotdb.confignode.manager.partition; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.junit.Assert; import org.junit.Test; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java index 4181fa9d67..766a5ed48f 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java @@ -19,7 +19,7 @@ package org.apache.iotdb.confignode.persistence.node; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.junit.Assert; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java index cce3ed250a..b0b97b2b0c 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java @@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.persistence.partition.statistics; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.junit.Assert; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java index 8ccf87c7c2..4ad69092ef 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java @@ -19,7 +19,7 @@ package org.apache.iotdb.confignode.persistence.partition.statistics; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics; +import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.junit.Assert;
