This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94bed63697747879440b91679454da330f4c8dd0 Author: YongzaoDan <[email protected]> AuthorDate: Thu Apr 13 22:05:14 2023 +0800 ready for test --- .../iotdb/confignode/manager/ConfigManager.java | 25 +- .../iotdb/confignode/manager/ProcedureManager.java | 79 +++-- .../confignode/manager/RetryFailedTasksThread.java | 21 +- .../iotdb/confignode/manager/load/LoadCache.java | 357 ++++++++++++++++++--- .../iotdb/confignode/manager/load/LoadManager.java | 183 +++++++++++ .../manager/load/balancer/RegionBalancer.java | 7 +- .../manager/load/balancer/RouteBalancer.java | 7 +- .../manager/load/heartbeat/HeartbeatService.java | 1 + .../manager/load/heartbeat/node/BaseNodeCache.java | 9 +- .../heartbeat/node/ConfigNodeHeartbeatCache.java | 10 +- .../heartbeat/node/DataNodeHeartbeatCache.java | 4 +- .../iotdb/confignode/manager/node/NodeManager.java | 149 +-------- .../manager/partition/PartitionManager.java | 146 +++------ .../manager/partition/PartitionMetrics.java | 18 +- .../manager/partition/RegionGroupStatus.java | 25 +- .../confignode/persistence/node/NodeInfo.java | 47 ++- .../partition/DatabasePartitionTable.java | 36 ++- .../persistence/partition/PartitionInfo.java | 50 ++- .../persistence/partition/RegionGroup.java | 13 +- .../procedure/env/ConfigNodeProcedureEnv.java | 37 +-- .../procedure/env/DataNodeRemoveHandler.java | 15 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../impl/statemachine/RegionMigrateProcedure.java | 3 + .../router/priority/GreedyPriorityTest.java | 2 +- .../priority/LeaderPriorityBalancerTest.java | 2 +- .../confignode/manager/node/NodeCacheTest.java | 4 +- 26 files changed, 835 insertions(+), 417 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index dab5101d1c..4a3d65565e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -398,10 +398,11 @@ public class ConfigManager implements IManager { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Force updating the target DataNode's status to Unknown - getNodeManager() - .getNodeCacheMap() - .get(dataNodeLocation.getDataNodeId()) - .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown)); + getLoadManager() + .forceUpdateNodeCache( + NodeType.DataNode, + dataNodeLocation.getDataNodeId(), + NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown)); LOGGER.info( "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown", dataNodeLocation.getDataNodeId()); @@ -442,12 +443,7 @@ public class ConfigManager implements IManager { .map(TDataNodeConfiguration::getLocation) .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId)) .collect(Collectors.toList()); - Map<Integer, String> nodeStatus = new HashMap<>(); - getNodeManager() - .getNodeCacheMap() - .forEach( - (nodeId, heartbeatCache) -> - nodeStatus.put(nodeId, heartbeatCache.getNodeStatusWithReason())); + Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason(); return new TShowClusterResp(status, configNodeLocations, dataNodeInfoLocations, nodeStatus); } else { return new TShowClusterResp(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>()); @@ -1146,10 +1142,11 @@ public class ConfigManager implements IManager { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Force updating the target ConfigNode's status to Unknown - getNodeManager() - .getNodeCacheMap() - .get(configNodeLocation.getConfigNodeId()) - .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown)); + getLoadManager() + .forceUpdateNodeCache( + NodeType.ConfigNode, + configNodeLocation.getConfigNodeId(), + NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown)); LOGGER.info( "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown", configNodeLocation.getConfigNodeId()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 5df812d4fb..5379ff8ee1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -40,7 +40,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.Procedure; @@ -94,7 +93,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -347,13 +345,33 @@ public class ProcedureManager { } public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) { - // TODO: Whether to guarantee the check high consistency, i.e, use consensus read to check - Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap = - configManager.getPartitionManager().getRegionGroupCacheMap(); - Optional<TConsensusGroupId> regionId = - regionReplicaMap.keySet().stream() - .filter(id -> id.getId() == migrateRegionReq.getRegionId()) - .findAny(); + TConsensusGroupId regionGroupId; + if (configManager + .getPartitionManager() + .isRegionGroupExists( + new TConsensusGroupId( + TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) { + regionGroupId = + new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()); + } else if (configManager + .getPartitionManager() + .isRegionGroupExists( + new TConsensusGroupId( + TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) { + regionGroupId = + new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()); + } else { + LOGGER.warn( + "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist", + migrateRegionReq.getRegionId()); + TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage( + String.format( + "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist", + migrateRegionReq.getRegionId())); + return status; + } + TDataNodeLocation originalDataNode = configManager .getNodeManager() @@ -364,18 +382,7 @@ public class ProcedureManager { .getNodeManager() .getRegisteredDataNode(migrateRegionReq.getToId()) .getLocation(); - if (!regionId.isPresent()) { - LOGGER.warn( - "Submit RegionMigrateProcedure failed, because no Region {}", - migrateRegionReq.getRegionId()); - TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage( - "Submit RegionMigrateProcedure failed, because no region Group " - + migrateRegionReq.getRegionId()); - return status; - } - Set<Integer> dataNodesInRegion = - regionReplicaMap.get(regionId.get()).getStatistics().getRegionStatisticsMap().keySet(); + if (originalDataNode == null) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because no original DataNode {}", @@ -394,7 +401,9 @@ public class ProcedureManager { "Submit RegionMigrateProcedure failed, because no target DataNode " + migrateRegionReq.getToId()); return status; - } else if (!dataNodesInRegion.contains(migrateRegionReq.getFromId())) { + } else if (configManager.getPartitionManager() + .getAllReplicaSets(originalDataNode.getDataNodeId()).stream() + .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}", migrateRegionReq.getFromId(), @@ -406,7 +415,9 @@ public class ProcedureManager { + " doesn't contain Region " + migrateRegionReq.getRegionId()); return status; - } else if (dataNodesInRegion.contains(migrateRegionReq.getToId())) { + } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId()) + .stream() + .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}", migrateRegionReq.getToId(), @@ -426,10 +437,8 @@ public class ProcedureManager { .map(TDataNodeConfiguration::getLocation) .map(TDataNodeLocation::getDataNodeId) .collect(Collectors.toSet()); - if (configManager - .getNodeManager() - .getNodeStatusByNodeId(migrateRegionReq.getFromId()) - .equals(NodeStatus.Unknown)) { + if (NodeStatus.Unknown.equals( + configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.", migrateRegionReq.getFromId()); @@ -440,18 +449,8 @@ public class ProcedureManager { + " is Unknown."); return status; } - dataNodesInRegion.retainAll(aliveDataNodes); - if (dataNodesInRegion.isEmpty()) { - LOGGER.warn( - "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group {} is unavailable.", - migrateRegionReq.getRegionId()); - TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage( - "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group " - + migrateRegionReq.getRegionId() - + " are unavailable."); - return status; - } else if (!aliveDataNodes.contains(migrateRegionReq.getToId())) { + + if (!aliveDataNodes.contains(migrateRegionReq.getToId())) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.", migrateRegionReq.getToId()); @@ -463,7 +462,7 @@ public class ProcedureManager { return status; } this.executor.submitProcedure( - new RegionMigrateProcedure(regionId.get(), originalDataNode, destDataNode)); + new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode)); LOGGER.info( "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}", migrateRegionReq.getRegionId(), diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java index 865b8b59c2..287cda6eb1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java @@ -28,7 +28,7 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; +import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -63,6 +63,7 @@ public class RetryFailedTasksThread { private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs(); private final IManager configManager; private final NodeManager nodeManager; + private final LoadManager loadManager; private final ScheduledExecutorService retryFailTasksExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-RetryFailedTasks-Service"); private final Object scheduleMonitor = new Object(); @@ -78,6 +79,7 @@ public class RetryFailedTasksThread { public RetryFailedTasksThread(IManager configManager) { this.configManager = configManager; this.nodeManager = configManager.getNodeManager(); + this.loadManager = configManager.getLoadManager(); this.oldUnknownNodes = new HashSet<>(); } @@ -131,15 +133,12 @@ public class RetryFailedTasksThread { .forEach( DataNodeConfiguration -> { TDataNodeLocation dataNodeLocation = DataNodeConfiguration.getLocation(); - BaseNodeCache newestNodeInformation = - nodeManager.getNodeCacheMap().get(dataNodeLocation.dataNodeId); - if (newestNodeInformation != null) { - if (newestNodeInformation.getNodeStatus() == NodeStatus.Running) { - oldUnknownNodes.remove(dataNodeLocation); - } else if (!oldUnknownNodes.contains(dataNodeLocation) - && newestNodeInformation.getNodeStatus() == NodeStatus.Unknown) { - newUnknownNodes.add(dataNodeLocation); - } + NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeLocation.getDataNodeId()); + if (nodeStatus == NodeStatus.Running) { + oldUnknownNodes.remove(dataNodeLocation); + } else if (!oldUnknownNodes.contains(dataNodeLocation) + && nodeStatus == NodeStatus.Unknown) { + newUnknownNodes.add(dataNodeLocation); } }); @@ -163,7 +162,7 @@ public class RetryFailedTasksThread { private void syncDetectTask() { for (Map.Entry<Integer, Queue<TOperatePipeOnDataNodeReq>> entry : messageMap.entrySet()) { int dataNodeId = entry.getKey(); - if (NodeStatus.Running.equals(nodeManager.getNodeStatusByNodeId(dataNodeId))) { + if (NodeStatus.Running.equals(loadManager.getNodeStatus(dataNodeId))) { final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>(); dataNodeLocationMap.put( dataNodeId, nodeManager.getRegisteredDataNodeLocations().get(dataNodeId)); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java index 03f002899c..0daa3846f5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java @@ -21,9 +21,12 @@ package org.apache.iotdb.confignode.manager.load; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; @@ -33,20 +36,21 @@ import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCach import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics; import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics; -import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.tsfile.utils.Pair; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** Maintain all kinds of heartbeat samples */ public class LoadCache { - private static final boolean IS_DATA_REGION_IOT_CONSENSUS = - ConsensusFactory.IOT_CONSENSUS.equals( - ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass()); - // Map<NodeId, INodeCache> private final Map<Integer, BaseNodeCache> nodeCacheMap; // Map<RegionGroupId, RegionGroupCache> @@ -57,6 +61,58 @@ public class LoadCache { this.regionGroupCacheMap = new ConcurrentHashMap<>(); } + public void initHeartbeatCache(IManager configManager) { + initNodeHeartbeatCache( + configManager.getNodeManager().getRegisteredConfigNodes(), + configManager.getNodeManager().getRegisteredDataNodes()); + initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets()); + } + + /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */ + private void initNodeHeartbeatCache( + List<TConfigNodeLocation> registeredConfigNodes, + List<TDataNodeConfiguration> registeredDataNodes) { + + final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID; + nodeCacheMap.clear(); + + // Init ConfigNodeHeartbeatCache + registeredConfigNodes.forEach( + configNodeLocation -> { + int configNodeId = configNodeLocation.getConfigNodeId(); + if (configNodeId != CURRENT_NODE_ID) { + nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeId)); + } + }); + // Force set itself and never update + nodeCacheMap.put( + ConfigNodeHeartbeatCache.CURRENT_NODE_ID, + new ConfigNodeHeartbeatCache( + CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS)); + + // Init DataNodeHeartbeatCache + registeredDataNodes.forEach( + dataNodeConfiguration -> { + int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); + nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId)); + }); + } + + /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */ + private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) { + regionGroupCacheMap.clear(); + regionReplicaSets.forEach( + regionReplicaSet -> + regionGroupCacheMap.put( + regionReplicaSet.getRegionId(), + new RegionGroupCache(regionReplicaSet.getRegionId()))); + } + + public void clearHeartbeatCache() { + nodeCacheMap.clear(); + regionGroupCacheMap.clear(); + } + /** * Cache the latest heartbeat sample of a ConfigNode. * @@ -77,7 +133,7 @@ public class LoadCache { */ public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) { nodeCacheMap - .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache()) + .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId)) .cacheHeartbeatSample(sample); } @@ -133,54 +189,263 @@ public class LoadCache { return differentRegionGroupStatisticsMap; } - public void initHeartbeatCache(IManager configManager) { - initNodeHeartbeatCache( - configManager.getNodeManager().getRegisteredConfigNodes(), - configManager.getNodeManager().getRegisteredDataNodes()); - initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets()); + /** + * Safely get NodeStatus by NodeId + * + * @param nodeId The specified NodeId + * @return NodeStatus of the specified Node. Unknown if cache doesn't exist. + */ + public NodeStatus getNodeStatus(int nodeId) { + BaseNodeCache nodeCache = nodeCacheMap.get(nodeId); + return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus(); } - /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */ - private void initNodeHeartbeatCache( - List<TConfigNodeLocation> registeredConfigNodes, - List<TDataNodeConfiguration> registeredDataNodes) { - final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID; - nodeCacheMap.clear(); + /** + * Safely get the specified Node's current status with reason + * + * @param nodeId The specified NodeId + * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise + */ + public String getNodeStatusWithReason(int nodeId) { + BaseNodeCache nodeCache = nodeCacheMap.get(nodeId); + return nodeCache == null + ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)" + : nodeCache.getNodeStatusWithReason(); + } - // Init ConfigNodeHeartbeatCache - registeredConfigNodes.forEach( - configNodeLocation -> { - if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) { - nodeCacheMap.put( - configNodeLocation.getConfigNodeId(), - new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())); + /** + * Get all Node's current status with reason + * + * @return Map<NodeId, NodeStatus with reason> + */ + public Map<Integer, String> getNodeStatusWithReason() { + return nodeCacheMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getNodeStatusWithReason())); + } + + /** + * Filter ConfigNodes through the specified NodeStatus + * + * @param status The specified NodeStatus + * @return Filtered ConfigNodes with the specified NodeStatus + */ + public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) { + return nodeCacheMap.entrySet().stream() + .filter( + nodeCacheEntry -> + nodeCacheEntry.getValue() instanceof ConfigNodeHeartbeatCache + && Arrays.stream(status) + .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus()))) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Filter DataNodes through the specified NodeStatus + * + * @param status The specified NodeStatus + * @return Filtered DataNodes with the specified NodeStatus + */ + public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) { + return nodeCacheMap.entrySet().stream() + .filter( + nodeCacheEntry -> + nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache + && Arrays.stream(status) + .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus()))) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Get the free disk space of the specified DataNode + * + * @param dataNodeId The index of the specified DataNode + * @return The free disk space that sample through heartbeat, 0 if no heartbeat received + */ + public double getFreeDiskSpace(int dataNodeId) { + DataNodeHeartbeatCache dataNodeHeartbeatCache = + (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId); + return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace(); + } + + /** + * Get the loadScore of each DataNode + * + * @return Map<DataNodeId, loadScore> + */ + public Map<Integer, Long> getAllDataNodeLoadScores() { + Map<Integer, Long> result = new ConcurrentHashMap<>(); + nodeCacheMap.forEach( + (dataNodeId, heartbeatCache) -> { + if (heartbeatCache instanceof DataNodeHeartbeatCache) { + result.put(dataNodeId, heartbeatCache.getLoadScore()); } }); - // Force set itself and never update - nodeCacheMap.put( - ConfigNodeHeartbeatCache.CURRENT_NODE_ID, - new ConfigNodeHeartbeatCache( - CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS)); + return result; + } - // Init DataNodeHeartbeatCache - registeredDataNodes.forEach( - dataNodeConfiguration -> - nodeCacheMap.put( - dataNodeConfiguration.getLocation().getDataNodeId(), new DataNodeHeartbeatCache())); + /** + * Get the lowest loadScore DataNode + * + * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received. + */ + public int getLowestLoadDataNode() { + return nodeCacheMap.entrySet().stream() + .filter(nodeCacheEntry -> nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache) + .min(Comparator.comparingLong(nodeCacheEntry -> nodeCacheEntry.getValue().getLoadScore())) + .map(Map.Entry::getKey) + .orElse(-1); } - /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */ - private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) { - regionGroupCacheMap.clear(); - regionReplicaSets.forEach( - regionReplicaSet -> - regionGroupCacheMap.put( - regionReplicaSet.getRegionId(), - new RegionGroupCache(regionReplicaSet.getRegionId()))); + /** + * Get the lowest loadScore DataNode from the specified DataNodes + * + * @param dataNodeIds The specified DataNodes + * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received. + */ + public int getLowestLoadDataNode(List<Integer> dataNodeIds) { + return dataNodeIds.stream() + .map(nodeCacheMap::get) + .filter(Objects::nonNull) + .min(Comparator.comparingLong(BaseNodeCache::getLoadScore)) + .map(BaseNodeCache::getNodeId) + .orElse(-1); } - public void clearHeartbeatCache() { - nodeCacheMap.clear(); - regionGroupCacheMap.clear(); + /** + * Force update the specified Node's cache + * + * @param nodeType Specified NodeType + * @param nodeId Specified NodeId + * @param heartbeatSample Specified NodeHeartbeatSample + */ + public void forceUpdateNodeCache( + NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) { + switch (nodeType) { + case ConfigNode: + nodeCacheMap + .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId)) + .forceUpdate(heartbeatSample); + break; + case DataNode: + nodeCacheMap + .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId)) + .forceUpdate(heartbeatSample); + break; + } + } + + /** Remove the specified Node's cache */ + public void removeNodeCache(int nodeId) { + nodeCacheMap.remove(nodeId); + } + + /** + * Safely get RegionStatus. + * + * @param consensusGroupId Specified RegionGroupId + * @param dataNodeId Specified RegionReplicaId + * @return Corresponding RegionStatus if cache exists, Unknown otherwise + */ + public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) { + return regionGroupCacheMap.containsKey(consensusGroupId) + ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId) + : RegionStatus.Unknown; + } + + /** + * Safely get RegionGroupStatus. + * + * @param consensusGroupId Specified RegionGroupId + * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise + */ + public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) { + return regionGroupCacheMap.containsKey(consensusGroupId) + ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus() + : RegionGroupStatus.Disabled; + } + + /** + * Safely get RegionGroupStatus. + * + * @param consensusGroupIds Specified RegionGroupIds + * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise + */ + public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus( + List<TConsensusGroupId> consensusGroupIds) { + Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new ConcurrentHashMap<>(); + for (TConsensusGroupId consensusGroupId : consensusGroupIds) { + regionGroupStatusMap.put(consensusGroupId, getRegionGroupStatus(consensusGroupId)); + } + return regionGroupStatusMap; + } + + /** + * Filter the RegionGroups through the RegionGroupStatus + * + * @param status The specified RegionGroupStatus + * @return Filtered RegionGroups with the specified RegionGroupStatus + */ + public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) { + return regionGroupCacheMap.entrySet().stream() + .filter( + regionGroupCacheEntry -> + Arrays.stream(status) + .anyMatch( + s -> + s.equals( + regionGroupCacheEntry + .getValue() + .getStatistics() + .getRegionGroupStatus()))) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Count the number of cluster Regions with specified RegionStatus + * + * @param type The specified RegionGroupType + * @param status The specified statues + * @return The number of cluster Regions with specified RegionStatus + */ + public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) { + AtomicInteger result = new AtomicInteger(0); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + if (type.equals(regionGroupId.getType())) { + regionGroupCache + .getStatistics() + .getRegionStatisticsMap() + .values() + .forEach( + regionStatistics -> { + if (Arrays.stream(status) + .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) { + result.getAndIncrement(); + } + }); + } + }); + return result.get(); + } + + /** + * Force update the specified RegionGroup's cache + * + * @param regionGroupId Specified RegionGroupId + * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap + */ + public void forceUpdateRegionGroupCache( + TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) { + regionGroupCacheMap + .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId)) + .forceUpdate(heartbeatSampleMap); + } + + /** Remove the specified RegionGroup's cache */ + public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { + regionGroupCacheMap.remove(consensusGroupId); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 15dc22c0f5..f66b805703 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; @@ -34,7 +37,10 @@ import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService; +import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService; +import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import com.google.common.eventbus.AsyncEventBus; @@ -162,6 +168,10 @@ public class LoadManager { return routeBalancer.getLatestRegionPriorityMap(); } + public void broadcastLatestRegionRouteMap() { + statisticsService.broadcastLatestRegionRouteMap(); + } + public void startLoadServices() { loadCache.initHeartbeatCache(configManager); routeBalancer.initRegionRouteMap(); @@ -178,4 +188,177 @@ public class LoadManager { public RouteBalancer getRouteBalancer() { return routeBalancer; } + + /** + * Safely get NodeStatus by NodeId + * + * @param nodeId The specified NodeId + * @return NodeStatus of the specified Node. Unknown if cache doesn't exist. + */ + public NodeStatus getNodeStatus(int nodeId) { + return loadCache.getNodeStatus(nodeId); + } + + /** + * Safely get the specified Node's current status with reason + * + * @param nodeId The specified NodeId + * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise + */ + public String getNodeStatusWithReason(int nodeId) { + return loadCache.getNodeStatusWithReason(nodeId); + } + + /** + * Get all Node's current status with reason + * + * @return Map<NodeId, NodeStatus with reason> + */ + public Map<Integer, String> getNodeStatusWithReason() { + return loadCache.getNodeStatusWithReason(); + } + + /** + * Filter ConfigNodes through the specified NodeStatus + * + * @param status The specified NodeStatus + * @return Filtered ConfigNodes with the specified NodeStatus + */ + public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) { + return loadCache.filterConfigNodeThroughStatus(status); + } + + /** + * Filter DataNodes through the specified NodeStatus + * + * @param status The specified NodeStatus + * @return Filtered DataNodes with the specified NodeStatus + */ + public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) { + return loadCache.filterDataNodeThroughStatus(status); + } + + /** + * Get the free disk space of the specified DataNode + * + * @param dataNodeId The index of the specified DataNode + * @return The free disk space that sample through heartbeat, 0 if no heartbeat received + */ + public double getFreeDiskSpace(int dataNodeId) { + return loadCache.getFreeDiskSpace(dataNodeId); + } + + /** + * Get the loadScore of each DataNode + * + * @return Map<DataNodeId, loadScore> + */ + public Map<Integer, Long> getAllDataNodeLoadScores() { + return loadCache.getAllDataNodeLoadScores(); + } + + /** + * Get the lowest loadScore DataNode + * + * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received. + */ + public int getLowestLoadDataNode() { + return loadCache.getLowestLoadDataNode(); + } + + /** + * Get the lowest loadScore DataNode from the specified DataNodes + * + * @param dataNodeIds The specified DataNodes + * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received. + */ + public int getLowestLoadDataNode(List<Integer> dataNodeIds) { + return loadCache.getLowestLoadDataNode(dataNodeIds); + } + + /** + * Force update the specified Node's cache + * + * @param nodeType Specified NodeType + * @param nodeId Specified NodeId + * @param heartbeatSample Specified NodeHeartbeatSample + */ + public void forceUpdateNodeCache( + NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) { + loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample); + } + + /** Remove the specified Node's cache */ + public void removeNodeCache(int nodeId) { + loadCache.removeNodeCache(nodeId); + } + + /** + * Safely get RegionStatus. + * + * @param consensusGroupId Specified RegionGroupId + * @param dataNodeId Specified RegionReplicaId + * @return Corresponding RegionStatus if cache exists, Unknown otherwise + */ + public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) { + return loadCache.getRegionStatus(consensusGroupId, dataNodeId); + } + + /** + * Safely get RegionGroupStatus. + * + * @param consensusGroupId Specified RegionGroupId + * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise + */ + public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) { + return loadCache.getRegionGroupStatus(consensusGroupId); + } + + /** + * Safely get RegionGroupStatus. + * + * @param consensusGroupIds Specified RegionGroupIds + * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise + */ + public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus( + List<TConsensusGroupId> consensusGroupIds) { + return loadCache.getRegionGroupStatus(consensusGroupIds); + } + + /** + * Filter the RegionGroups through the RegionGroupStatus + * + * @param status The specified RegionGroupStatus + * @return Filtered RegionGroups with the specified RegionGroupStatus + */ + public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) { + return loadCache.filterRegionGroupThroughStatus(status); + } + + /** + * Count the number of cluster Regions with specified RegionStatus + * + * @param type The specified RegionGroupType + * @param status The specified statues + * @return The number of cluster Regions with specified RegionStatus + */ + public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) { + return loadCache.countRegionWithSpecifiedStatus(type, status); + } + + /** + * Force update the specified RegionGroup's cache + * + * @param regionGroupId Specified RegionGroupId + * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap + */ + public void forceUpdateRegionGroupCache( + TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) { + loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap); + } + + /** Remove the specified RegionGroup's cache */ + public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { + loadCache.removeRegionGroupCache(consensusGroupId); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index 882a5eeebe..f6d1524df0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java @@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.ClusterSchemaManager; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; @@ -111,7 +112,7 @@ public class RegionBalancer { dataNodeConfiguration -> { int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); availableDataNodeMap.put(dataNodeId, dataNodeConfiguration); - freeDiskSpaceMap.put(dataNodeId, getNodeManager().getFreeDiskSpace(dataNodeId)); + freeDiskSpaceMap.put(dataNodeId, getLoadManager().getFreeDiskSpace(dataNodeId)); }); // Generate allocation plan @@ -145,6 +146,10 @@ public class RegionBalancer { return configManager.getPartitionManager(); } + private LoadManager getLoadManager() { + return configManager.getLoadManager(); + } + public enum RegionGroupAllocatePolicy { COPY_SET, GREEDY diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 4a27368459..a87cb051ba 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -33,6 +33,7 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; @@ -193,7 +194,7 @@ public class RouteBalancer { private boolean updateRegionPriorityMap() { Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap(); - Map<Integer, Long> dataNodeLoadScoreMap = getNodeManager().getAllLoadScores(); + Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores(); // Balancing region priority in each SchemaRegionGroup Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap = @@ -412,4 +413,8 @@ public class RouteBalancer { private PartitionManager getPartitionManager() { return configManager.getPartitionManager(); } + + private LoadManager getLoadManager() { + return configManager.getLoadManager(); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java index b2aaaa15aa..1ad268c5bd 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java @@ -171,6 +171,7 @@ public class HeartbeatService { new DataNodeHeartbeatHandler( dataNodeInfo.getLocation().getDataNodeId(), loadCache, + configManager.getLoadManager().getRouteBalancer(), configManager.getClusterQuotaManager().getDeviceNum(), configManager.getClusterQuotaManager().getTimeSeriesNum(), configManager.getClusterQuotaManager().getRegionDisk()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java index 392ae24b6e..d55bf144d9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java @@ -32,6 +32,8 @@ public abstract class BaseNodeCache { // Max heartbeat cache samples store size public static final int MAXIMUM_WINDOW_SIZE = 100; + protected final int nodeId; + // SlidingWindow stores the heartbeat sample data protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>(); @@ -42,7 +44,8 @@ public abstract class BaseNodeCache { protected volatile NodeStatistics currentStatistics; /** Constructor for NodeCache with default NodeStatistics */ - protected BaseNodeCache() { + protected BaseNodeCache(int nodeId) { + this.nodeId = nodeId; this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics(); this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics(); } @@ -108,6 +111,10 @@ public abstract class BaseNodeCache { */ protected abstract void updateCurrentStatistics(); + public int getNodeId() { + return nodeId; + } + /** * TODO: The loadScore of each Node will be changed to Double * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java index afd9d3e197..0bb831c0dd 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java @@ -31,18 +31,14 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache { public static final NodeStatistics CURRENT_NODE_STATISTICS = new NodeStatistics(0, NodeStatus.Running, null); - private final int configNodeId; - /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics */ public ConfigNodeHeartbeatCache(int configNodeId) { - super(); - this.configNodeId = configNodeId; + super(configNodeId); } /** Constructor only for ConfigNode-leader */ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) { - super(); - this.configNodeId = configNodeId; + super(configNodeId); this.previousStatistics = statistics; this.currentStatistics = statistics; } @@ -50,7 +46,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache { @Override protected void updateCurrentStatistics() { // Skip itself - if (configNodeId == CURRENT_NODE_ID) { + if (nodeId == CURRENT_NODE_ID) { return; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java index cb20836120..aa36677983 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java @@ -28,8 +28,8 @@ public class DataNodeHeartbeatCache extends BaseNodeCache { private volatile TLoadSample latestLoadSample; /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics */ - public DataNodeHeartbeatCache() { - super(); + public DataNodeHeartbeatCache(int dataNodeId) { + super(dataNodeId); this.latestLoadSample = new TLoadSample(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 60e0134320..1482231c87 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -46,7 +46,6 @@ import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; -import org.apache.iotdb.confignode.manager.ClusterQuotaManager; import org.apache.iotdb.confignode.manager.ClusterSchemaManager; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.IManager; @@ -54,9 +53,7 @@ import org.apache.iotdb.confignode.manager.TriggerManager; import org.apache.iotdb.confignode.manager.UDFManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.manager.pipe.PipeManager; @@ -82,19 +79,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** NodeManager manages cluster node addition and removal requests */ public class NodeManager { @@ -109,13 +102,10 @@ public class NodeManager { private final ReentrantLock removeConfigNodeLock; - private final Random random; - public NodeManager(IManager configManager, NodeInfo nodeInfo) { this.configManager = configManager; this.nodeInfo = nodeInfo; this.removeConfigNodeLock = new ReentrantLock(); - this.random = new Random(System.currentTimeMillis()); } /** @@ -365,15 +355,6 @@ public class NodeManager { return nodeInfo.getRegisteredDataNodeCount(); } - /** - * Only leader use this interface - * - * @return The number of total cpu cores in online DataNodes - */ - public int getTotalCpuCoreCount() { - return nodeInfo.getTotalCpuCoreCount(); - } - /** * Only leader use this interface * @@ -417,7 +398,7 @@ public class NodeManager { TDataNodeInfo dataNodeInfo = new TDataNodeInfo(); int dataNodeId = registeredDataNode.getLocation().getDataNodeId(); dataNodeInfo.setDataNodeId(dataNodeId); - dataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId)); + dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId)); dataNodeInfo.setRpcAddresss( registeredDataNode.getLocation().getClientRpcEndPoint().getIp()); dataNodeInfo.setRpcPort( @@ -481,7 +462,7 @@ public class NodeManager { TConfigNodeInfo info = new TConfigNodeInfo(); int configNodeId = configNodeLocation.getConfigNodeId(); info.setConfigNodeId(configNodeId); - info.setStatus(getNodeStatusWithReason(configNodeId)); + info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId)); info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp()); info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort()); info.setRoleType( @@ -666,98 +647,25 @@ public class NodeManager { } } - public Map<Integer, BaseNodeCache> getNodeCacheMap() { - return nodeCacheMap; - } - - public void removeNodeCache(int nodeId) { - nodeCacheMap.remove(nodeId); - } - - /** - * Safely get the specific Node's current status for showing cluster - * - * @param nodeId The specific Node's index - * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise - */ - private String getNodeStatusWithReason(int nodeId) { - BaseNodeCache nodeCache = nodeCacheMap.get(nodeId); - return nodeCache == null - ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)" - : nodeCache.getNodeStatusWithReason(); - } - /** - * Filter the registered ConfigNodes through the specific NodeStatus + * Filter ConfigNodes through the specified NodeStatus * - * @param status The specific NodeStatus - * @return Filtered ConfigNodes with the specific NodeStatus + * @param status The specified NodeStatus + * @return Filtered ConfigNodes with the specified NodeStatus */ public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) { - return getRegisteredConfigNodes().stream() - .filter( - registeredConfigNode -> { - int configNodeId = registeredConfigNode.getConfigNodeId(); - return nodeCacheMap.containsKey(configNodeId) - && Arrays.stream(status) - .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus())); - }) - .collect(Collectors.toList()); - } - - /** - * Get NodeStatus by nodeId - * - * @param nodeId The specific NodeId - * @return NodeStatus of the specific node. If node does not exist, return null. - */ - public NodeStatus getNodeStatusByNodeId(int nodeId) { - BaseNodeCache baseNodeCache = nodeCacheMap.get(nodeId); - return baseNodeCache == null ? null : baseNodeCache.getNodeStatus(); + return nodeInfo.getRegisteredConfigNodes( + getLoadManager().filterConfigNodeThroughStatus(status)); } /** - * Filter the registered DataNodes through the specific NodeStatus + * Filter DataNodes through the specified NodeStatus * - * @param status The specific NodeStatus - * @return Filtered DataNodes with the specific NodeStatus + * @param status The specified NodeStatus + * @return Filtered DataNodes with the specified NodeStatus */ public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) { - return getRegisteredDataNodes().stream() - .filter( - registeredDataNode -> { - int dataNodeId = registeredDataNode.getLocation().getDataNodeId(); - return nodeCacheMap.containsKey(dataNodeId) - && Arrays.stream(status) - .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus())); - }) - .collect(Collectors.toList()); - } - - /** - * Get the loadScore of each DataNode - * - * @return Map<DataNodeId, loadScore> - */ - public Map<Integer, Long> getAllLoadScores() { - Map<Integer, Long> result = new ConcurrentHashMap<>(); - - nodeCacheMap.forEach( - (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore())); - - return result; - } - - /** - * Get the free disk space of the specified DataNode - * - * @param dataNodeId The index of the specified DataNode - * @return The free disk space that sample through heartbeat, 0 if no heartbeat received - */ - public double getFreeDiskSpace(int dataNodeId) { - DataNodeHeartbeatCache dataNodeHeartbeatCache = - (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId); - return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace(); + return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status)); } /** @@ -767,15 +675,10 @@ public class NodeManager { */ public Optional<TDataNodeLocation> getLowestLoadDataNode() { // TODO get real lowest load data node after scoring algorithm being implemented - List<TDataNodeConfiguration> targetDataNodeList = - filterDataNodeThroughStatus(NodeStatus.Running); - - if (targetDataNodeList == null || targetDataNodeList.isEmpty()) { - return Optional.empty(); - } else { - int index = random.nextInt(targetDataNodeList.size()); - return Optional.of(targetDataNodeList.get(index).location); - } + int dataNodeId = getLoadManager().getLowestLoadDataNode(); + return dataNodeId < 0 + ? Optional.empty() + : Optional.of(getRegisteredDataNode(dataNodeId).getLocation()); } /** @@ -784,22 +687,8 @@ public class NodeManager { * @return TDataNodeLocation with the lowest loadScore */ public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) { - AtomicInteger result = new AtomicInteger(); - AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE); - - nodes.forEach( - nodeID -> { - BaseNodeCache cache = nodeCacheMap.get(nodeID); - long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore(); - if (score < lowestLoadScore.get()) { - result.set(nodeID); - lowestLoadScore.set(score); - } - }); - - LOGGER.info( - "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore); - return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()); + int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes)); + return getRegisteredDataNode(dataNodeId).getLocation(); } private ConsensusManager getConsensusManager() { @@ -829,8 +718,4 @@ public class NodeManager { private UDFManager getUDFManager() { return configManager.getUDFManager(); } - - private ClusterQuotaManager getClusterQuotaManager() { - return configManager.getClusterQuotaManager(); - } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index fdec1487e6..f15978d6bd 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.RegionRoleType; -import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.partition.DataPartitionTable; @@ -87,7 +86,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -100,7 +98,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** The PartitionManager Manages cluster PartitionTable read and write requests. */ @@ -588,12 +585,34 @@ public class PartitionManager { * Only leader use this interface. * * @param database The specified Database - * @return All Regions' RegionReplicaSet of the specified StorageGroup + * @return All Regions' RegionReplicaSet of the specified Database */ public List<TRegionReplicaSet> getAllReplicaSets(String database) { return partitionInfo.getAllReplicaSets(database); } + /** + * Get all RegionGroups currently owned by the specified Database + * + * @param dataNodeId The specified dataNodeId + * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId + */ + public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) { + return partitionInfo.getAllReplicaSets(dataNodeId); + } + + /** + * Only leader use this interface. + * + * @param database The specified Database + * @param regionGroupIds The specified RegionGroupIds + * @return All Regions' RegionReplicaSet of the specified Database + */ + public List<TRegionReplicaSet> getReplicaSets( + String database, List<TConsensusGroupId> regionGroupIds) { + return partitionInfo.getReplicaSets(database, regionGroupIds); + } + /** * Only leader use this interface. * @@ -666,22 +685,22 @@ public class PartitionManager { /** * Only leader use this interface. * - * @param storageGroup StorageGroupName + * @param database DatabaseName * @param type SchemaRegion or DataRegion * @return The specific StorageGroup's Regions that sorted by the number of allocated slots * @throws NoAvailableRegionGroupException When all RegionGroups within the specified StorageGroup * are unavailable currently */ public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter( - String storageGroup, TConsensusGroupType type) throws NoAvailableRegionGroupException { + String database, TConsensusGroupType type) throws NoAvailableRegionGroupException { // Collect static data List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter = - partitionInfo.getRegionGroupSlotsCounter(storageGroup, type); + partitionInfo.getRegionGroupSlotsCounter(database, type); // Filter RegionGroups that have Disabled status List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>(); for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) { - RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight()); + RegionGroupStatus status = getLoadManager().getRegionGroupStatus(slotsCounter.getRight()); if (!RegionGroupStatus.Disabled.equals(status)) { result.add(slotsCounter); } @@ -691,6 +710,9 @@ public class PartitionManager { throw new NoAvailableRegionGroupException(type); } + Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = + getLoadManager() + .getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList())); result.sort( (o1, o2) -> { // Use the number of partitions as the first priority @@ -700,8 +722,9 @@ public class PartitionManager { return 1; } else { // Use RegionGroup status as second priority, Running > Available > Discouraged - return getRegionGroupStatus(o1.getRight()) - .compareTo(getRegionGroupStatus(o2.getRight())); + return regionGroupStatusMap + .get(o1.getRight()) + .compare(regionGroupStatusMap.get(o2.getRight())); } }); return result; @@ -759,7 +782,8 @@ public class PartitionManager { .forEach( regionInfo -> { regionInfo.setStatus( - getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId()) + getLoadManager() + .getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId()) .getStatus()); String regionType = @@ -773,6 +797,15 @@ public class PartitionManager { return regionInfoListResp; } + /** + * Check if the specified RegionGroup exists. + * + * @param regionGroupId The specified RegionGroup + */ + public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) { + return partitionInfo.isRegionGroupExisted(regionGroupId); + } + /** * update region location * @@ -780,13 +813,6 @@ public class PartitionManager { * @return TSStatus */ public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) { - // Remove heartbeat cache if exists - if (regionGroupCacheMap.containsKey(req.getRegionId())) { - regionGroupCacheMap - .get(req.getRegionId()) - .removeCacheIfExists(req.getOldNode().getDataNodeId()); - } - return getConsensusManager().write(req).getStatus(); } @@ -1059,93 +1085,23 @@ public class PartitionManager { /* Stop the RegionCleaner service */ currentRegionMaintainerFuture.cancel(false); currentRegionMaintainerFuture = null; - regionGroupCacheMap.clear(); LOGGER.info("RegionCleaner is stopped successfully."); } } } - public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { - regionGroupCacheMap.remove(consensusGroupId); - } - /** - * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus + * Filter the RegionGroups in the specified Database through the RegionGroupStatus * - * @param storageGroup The specified StorageGroup + * @param database The specified Database * @param status The specified RegionGroupStatus - * @return Filtered RegionGroups with the specific RegionGroupStatus + * @return Filtered RegionGroups with the specified RegionGroupStatus */ public List<TRegionReplicaSet> filterRegionGroupThroughStatus( - String storageGroup, RegionGroupStatus... status) { - return getAllReplicaSets(storageGroup).stream() - .filter( - regionReplicaSet -> { - TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId(); - return regionGroupCacheMap.containsKey(regionGroupId) - && Arrays.stream(status) - .anyMatch( - s -> - s.equals( - regionGroupCacheMap - .get(regionGroupId) - .getStatistics() - .getRegionGroupStatus())); - }) - .collect(Collectors.toList()); - } - - /** - * Count the number of cluster Regions with specified RegionStatus - * - * @param type The specified RegionGroupType - * @param status The specified statues - * @return The number of cluster Regions with specified RegionStatus - */ - public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) { - AtomicInteger result = new AtomicInteger(0); - regionGroupCacheMap.forEach( - (regionGroupId, regionGroupCache) -> { - if (type.equals(regionGroupId.getType())) { - regionGroupCache - .getStatistics() - .getRegionStatisticsMap() - .values() - .forEach( - regionStatistics -> { - if (Arrays.stream(status) - .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) { - result.getAndIncrement(); - } - }); - } - }); - return result.get(); - } - - /** - * Safely get RegionStatus. - * - * @param consensusGroupId Specified RegionGroupId - * @param dataNodeId Specified RegionReplicaId - * @return Corresponding RegionStatus if cache exists, Unknown otherwise - */ - public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) { - return regionGroupCacheMap.containsKey(consensusGroupId) - ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId) - : RegionStatus.Unknown; - } - - /** - * Safely get RegionGroupStatus. - * - * @param consensusGroupId Specified RegionGroupId - * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise - */ - public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) { - return regionGroupCacheMap.containsKey(consensusGroupId) - ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus() - : RegionGroupStatus.Disabled; + String database, RegionGroupStatus... status) { + List<TConsensusGroupId> matchedRegionGroups = + getLoadManager().filterRegionGroupThroughStatus(status); + return getReplicaSets(database, matchedRegionGroups); } public void getSchemaRegionIds( diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java index 21909d6a5a..f9eff7b0de 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java @@ -72,10 +72,9 @@ public class PartitionMetrics implements IMetricSet { metricService.createAutoGauge( Metric.REGION_NUM.toString(), MetricLevel.CORE, - getPartitionManager(), - partitionManager -> - partitionManager.countRegionWithSpecifiedStatus( - TConsensusGroupType.SchemaRegion, status), + getLoadManager(), + loadManager -> + loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.SchemaRegion, status), Tag.TYPE.toString(), TConsensusGroupType.SchemaRegion.toString(), Tag.STATUS.toString(), @@ -85,10 +84,9 @@ public class PartitionMetrics implements IMetricSet { metricService.createAutoGauge( Metric.REGION_NUM.toString(), MetricLevel.CORE, - getPartitionManager(), - partitionManager -> - partitionManager.countRegionWithSpecifiedStatus( - TConsensusGroupType.DataRegion, status), + getLoadManager(), + loadManager -> + loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.DataRegion, status), Tag.TYPE.toString(), TConsensusGroupType.DataRegion.toString(), Tag.STATUS.toString(), @@ -330,8 +328,8 @@ public class PartitionMetrics implements IMetricSet { return configManager.getClusterSchemaManager(); } - private PartitionManager getPartitionManager() { - return configManager.getPartitionManager(); + private LoadManager getLoadManager() { + return configManager.getLoadManager(); } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java index 9e58ae6d1c..3cf9976c3d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java @@ -21,20 +21,20 @@ package org.apache.iotdb.confignode.manager.partition; public enum RegionGroupStatus { /** All Regions in RegionGroup are in the Running status */ - Running("Running"), + Running("Running", 1), /** * All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in * the Unknown status is less than half */ - Available("Available"), + Available("Available", 2), /** * All Regions in RegionGroup are in the Running, Unknown or ReadOnly status, and at least 1 node * is in ReadOnly status, the number of Regions in the Unknown or ReadOnly status is less than * half */ - Discouraged("Discouraged"), + Discouraged("Discouraged", 3), /** * The following cases will lead to Disabled RegionGroup: @@ -43,18 +43,24 @@ public enum RegionGroupStatus { * * <p>2. More than half of the Regions are in Unknown or ReadOnly status */ - Disabled("Disabled"); + Disabled("Disabled", 4); private final String status; + private final int weight; - RegionGroupStatus(String status) { + RegionGroupStatus(String status, int weight) { this.status = status; + this.weight = weight; } public String getStatus() { return status; } + public int getWeight() { + return weight; + } + public static RegionGroupStatus parse(String status) { for (RegionGroupStatus regionGroupStatus : RegionGroupStatus.values()) { if (regionGroupStatus.status.equals(status)) { @@ -63,4 +69,13 @@ public enum RegionGroupStatus { } throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status)); } + + /** + * Compare the weight of two RegionGroupStatus + * + * <p>Running > Available > Discouraged > Disabled + */ + public int compare(RegionGroupStatus other) { + return Integer.compare(this.weight, other.weight); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java index 67c5526ff8..4c0b1761cc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java @@ -221,18 +221,6 @@ public class NodeInfo implements SnapshotProcessor { return result; } - /** Return the number of registered ConfigNodes */ - public int getRegisteredConfigNodeCount() { - int result; - configNodeInfoReadWriteLock.readLock().lock(); - try { - result = registeredConfigNodes.size(); - } finally { - configNodeInfoReadWriteLock.readLock().unlock(); - } - return result; - } - /** Return the number of total cpu cores in online DataNodes */ public int getTotalCpuCoreCount() { int result = 0; @@ -269,6 +257,23 @@ public class NodeInfo implements SnapshotProcessor { } } + /** @return The specified registered DataNodes */ + public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) { + List<TDataNodeConfiguration> result = new ArrayList<>(); + dataNodeInfoReadWriteLock.readLock().lock(); + try { + dataNodeIds.forEach( + dataNodeId -> { + if (registeredDataNodes.containsKey(dataNodeId)) { + result.add(registeredDataNodes.get(dataNodeId).deepCopy()); + } + }); + } finally { + dataNodeInfoReadWriteLock.readLock().unlock(); + } + return result; + } + /** * Update ConfigNodeList both in memory and confignode-system.properties file * @@ -336,6 +341,7 @@ public class NodeInfo implements SnapshotProcessor { return status; } + /** @return All registered ConfigNodes */ public List<TConfigNodeLocation> getRegisteredConfigNodes() { List<TConfigNodeLocation> result; configNodeInfoReadWriteLock.readLock().lock(); @@ -347,6 +353,23 @@ public class NodeInfo implements SnapshotProcessor { return result; } + /** @return The specified registered ConfigNode */ + public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) { + List<TConfigNodeLocation> result = new ArrayList<>(); + configNodeInfoReadWriteLock.readLock().lock(); + try { + configNodeIds.forEach( + configNodeId -> { + if (registeredConfigNodes.containsKey(configNodeId)) { + result.add(registeredConfigNodes.get(configNodeId).deepCopy()); + } + }); + } finally { + configNodeInfoReadWriteLock.readLock().unlock(); + } + return result; + } + public int generateNextNodeId() { return nextNodeId.incrementAndGet(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index b27a4b529f..4db9a39eb0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -119,8 +119,9 @@ public class DatabasePartitionTable { return result; } + /** - * Get all RegionGroups currently owned by this StorageGroup + * Get all RegionGroups currently owned by this Database * * @param type The specified TConsensusGroupType * @return Deep copy of all Regions' RegionReplicaSet with the specified TConsensusGroupType @@ -137,6 +138,37 @@ public class DatabasePartitionTable { return result; } + /** + * Get all RegionGroups currently owned by the specified Database + * + * @param dataNodeId The specified dataNodeId + * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId + */ + public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) { + return regionGroupMap.values().stream() + .filter(regionGroup -> regionGroup.belongsToDataNode(dataNodeId)) + .map(RegionGroup::getReplicaSet) + .collect(Collectors.toList()); + } + + /** + * Get the RegionGroups with the specified RegionGroupIds + * + * @param regionGroupIds The specified RegionGroupIds + * @return Deep copy of the RegionGroups with the specified RegionGroupIds + */ + public List<TRegionReplicaSet> getReplicaSets(List<TConsensusGroupId> regionGroupIds) { + List<TRegionReplicaSet> result = new ArrayList<>(); + + for (TConsensusGroupId regionGroupId : regionGroupIds) { + if (regionGroupMap.containsKey(regionGroupId)) { + result.add(regionGroupMap.get(regionGroupId).getReplicaSet()); + } + } + + return result; + } + /** * Only leader use this interface. * @@ -496,7 +528,7 @@ public class DatabasePartitionTable { * @param regionId TConsensusGroupId * @return True if contains. */ - public boolean containRegion(TConsensusGroupId regionId) { + public boolean containRegionGroup(TConsensusGroupId regionId) { return regionGroupMap.containsKey(regionId); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index b19ac834b3..062500012e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -488,6 +488,17 @@ public class PartitionInfo implements SnapshotProcessor { return regionResp; } + /** + * Check if the specified RegionGroup exists. + * + * @param regionGroupId The specified RegionGroup + */ + public boolean isRegionGroupExisted(TConsensusGroupId regionGroupId) { + return databasePartitionTables.values().stream() + .anyMatch( + databasePartitionTable -> databasePartitionTable.containRegionGroup(regionGroupId)); + } + /** * Update the location info of given regionId * @@ -500,9 +511,10 @@ public class PartitionInfo implements SnapshotProcessor { TDataNodeLocation oldNode = req.getOldNode(); TDataNodeLocation newNode = req.getNewNode(); databasePartitionTables.values().stream() - .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId)) + .filter(databasePartitionTable -> databasePartitionTable.containRegionGroup(regionId)) .forEach( - sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode)); + databasePartitionTable -> + databasePartitionTable.updateRegionLocation(regionId, oldNode, newNode)); return status; } @@ -516,7 +528,7 @@ public class PartitionInfo implements SnapshotProcessor { public String getRegionStorageGroup(TConsensusGroupId regionId) { Optional<DatabasePartitionTable> sgPartitionTableOptional = databasePartitionTables.values().stream() - .filter(s -> s.containRegion(regionId)) + .filter(s -> s.containRegionGroup(regionId)) .findFirst(); return sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null); } @@ -619,6 +631,38 @@ public class PartitionInfo implements SnapshotProcessor { } } + /** + * Get all RegionGroups currently owned by the specified Database + * + * @param dataNodeId The specified dataNodeId + * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId + */ + public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) { + List<TRegionReplicaSet> result = new ArrayList<>(); + databasePartitionTables + .values() + .forEach( + databasePartitionTable -> + result.addAll(databasePartitionTable.getAllReplicaSets(dataNodeId))); + return result; + } + + /** + * Only leader use this interface. + * + * @param database The specified Database + * @param regionGroupIds The specified RegionGroupIds + * @return All Regions' RegionReplicaSet of the specified Database + */ + public List<TRegionReplicaSet> getReplicaSets( + String database, List<TConsensusGroupId> regionGroupIds) { + if (databasePartitionTables.containsKey(database)) { + return databasePartitionTables.get(database).getReplicaSets(regionGroupIds); + } else { + return new ArrayList<>(); + } + } + /** * Only leader use this interface. * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java index 4c4c7ffbc9..a555c2349c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java @@ -71,7 +71,7 @@ public class RegionGroup { } public TRegionReplicaSet getReplicaSet() { - return replicaSet; + return replicaSet.deepCopy(); } /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */ @@ -93,6 +93,17 @@ public class RegionGroup { return totalTimeSlotCount.get(); } + /** + * Check if the RegionGroup belongs to the specified DataNode. + * + * @param dataNodeId The specified DataNodeId. + * @return True if the RegionGroup belongs to the specified DataNode. + */ + public boolean belongsToDataNode(int dataNodeId) { + return replicaSet.getDataNodeLocations().stream() + .anyMatch(dataNodeLocation -> dataNodeLocation.getDataNodeId() == dataNodeId); + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(createTime, outputStream); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 06aef7c2f6..8b925535bf 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.trigger.TriggerInformation; @@ -49,7 +50,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample; -import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; @@ -145,8 +145,7 @@ public class ConfigNodeProcedureEnv { * @throws TException Thrift IOE */ public boolean invalidateCache(String storageGroupName) throws IOException, TException { - NodeManager nodeManager = configManager.getNodeManager(); - List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes(); + List<TDataNodeConfiguration> allDataNodes = getNodeManager().getRegisteredDataNodes(); TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq(); invalidateCacheReq.setStorageGroup(true); invalidateCacheReq.setFullPath(storageGroupName); @@ -154,14 +153,14 @@ public class ConfigNodeProcedureEnv { int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); // if the node is not alive, sleep 1 second and try again - NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId); + NodeStatus nodeStatus = getLoadManager().getNodeStatus(dataNodeId); if (nodeStatus == NodeStatus.Unknown) { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e); } - nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId); + nodeStatus = getLoadManager().getNodeStatus(dataNodeId); } if (nodeStatus == NodeStatus.Running) { @@ -202,14 +201,11 @@ public class ConfigNodeProcedureEnv { } public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) { - return configManager - .getNodeManager() + return getNodeManager() .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly) .size() - Boolean.compare( - configManager - .getNodeManager() - .getNodeStatusByNodeId(removedDatanode.getDataNodeId()) + getLoadManager().getNodeStatus(removedDatanode.getDataNodeId()) != NodeStatus.Unknown, false) >= NodeInfo.getMinimumDataNode(); @@ -305,8 +301,6 @@ public class ConfigNodeProcedureEnv { * @throws ProcedureException if failed status */ public void stopConfigNode(TConfigNodeLocation tConfigNodeLocation) throws ProcedureException { - getNodeManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId()); - TSStatus tsStatus = (TSStatus) SyncConfigNodeClientPool.getInstance() @@ -318,6 +312,8 @@ public class ConfigNodeProcedureEnv { if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ProcedureException(tsStatus.getMessage()); } + + getLoadManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId()); } /** @@ -371,8 +367,7 @@ public class ConfigNodeProcedureEnv { */ public void markDataNodeAsRemovingAndBroadcast(TDataNodeLocation dataNodeLocation) { // Send request to update NodeStatus on the DataNode to be removed - if (configManager.getNodeManager().getNodeStatusByNodeId(dataNodeLocation.getDataNodeId()) - == NodeStatus.Unknown) { + if (getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()) == NodeStatus.Unknown) { SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithGivenRetry( dataNodeLocation.getInternalEndPoint(), @@ -388,10 +383,11 @@ public class ConfigNodeProcedureEnv { } // Force updating NodeStatus to Removing - getNodeManager() - .getNodeCacheMap() - .get(dataNodeLocation.getDataNodeId()) - .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing)); + getLoadManager() + .forceUpdateNodeCache( + NodeType.DataNode, + dataNodeLocation.getDataNodeId(), + NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing)); } /** @@ -539,10 +535,7 @@ public class ConfigNodeProcedureEnv { (dataNodeId, regionStatus) -> heartbeatSampleMap.put( dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus))); - getPartitionManager() - .getRegionGroupCacheMap() - .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId)) - .forceUpdate(heartbeatSampleMap); + getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap); // Select leader greedily for iot consensus protocol if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index bf1ce7e940..c75fd419b8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -34,7 +34,6 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; @@ -328,7 +327,7 @@ public class DataNodeRemoveHandler { TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); status = - configManager.getNodeManager().getNodeStatusByNodeId(originalDataNode.getDataNodeId()) + configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId()) == NodeStatus.Unknown ? SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithGivenRetry( @@ -375,6 +374,9 @@ public class DataNodeRemoveHandler { getIdWithRpcEndpoint(originalDataNode), getIdWithRpcEndpoint(destDataNode)); + // Remove the RegionGroupCache of the regionId + configManager.getLoadManager().removeRegionGroupCache(regionId); + // Broadcast the latest RegionRouteMap when Region migration finished configManager.getLoadManager().broadcastLatestRegionRouteMap(); } @@ -427,7 +429,7 @@ public class DataNodeRemoveHandler { SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithGivenRetry( dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2); - configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId()); + configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId()); LOGGER.info( "{}, Stop Data Node result: {}, stoppedDataNode: {}", REMOVE_DATANODE_PROCESS, @@ -509,9 +511,8 @@ public class DataNodeRemoveHandler { if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) { for (TDataNodeLocation dataNodeLocation : removedDataNodes) { // check whether removed data node is in running state - BaseNodeCache nodeCache = - configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId()); - if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) { + if (!NodeStatus.Running.equals( + configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) { removedDataNodes.remove(dataNodeLocation); LOGGER.error( "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication", @@ -530,7 +531,7 @@ public class DataNodeRemoveHandler { removeDataNodePlan.getDataNodeLocations().stream() .filter( x -> - configManager.getNodeManager().getNodeStatusByNodeId(x.getDataNodeId()) + configManager.getLoadManager().getNodeStatus(x.getDataNodeId()) != NodeStatus.Unknown) .count(); if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index f1271c25b1..43c3404b58 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -120,7 +120,7 @@ public class DeleteDatabaseProcedure regionReplicaSet -> { // Clear heartbeat cache along the way env.getConfigManager() - .getPartitionManager() + .getLoadManager() .removeRegionGroupCache(regionReplicaSet.getRegionId()); env.getConfigManager() .getLoadManager() diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java index 46b58399ca..e1f02cffb2 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java @@ -47,6 +47,9 @@ import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; /** Region migrate procedure */ public class RegionMigrateProcedure extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> { + + // TODO: Reach an agreement on RegionMigrateProcedure + private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class); private static final int RETRY_THRESHOLD = 5; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java index 6da0304f44..372832d106 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java @@ -64,7 +64,7 @@ public class GreedyPriorityTest { NodeStatus.Running, NodeStatus.Unknown, NodeStatus.Running, NodeStatus.ReadOnly }; for (int i = 0; i < 4; i++) { - nodeCacheMap.put(i, new DataNodeHeartbeatCache()); + nodeCacheMap.put(i, new DataNodeHeartbeatCache(i)); nodeCacheMap .get(i) .cacheHeartbeatSample( diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java index f1bf444427..b6248d7872 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java @@ -61,7 +61,7 @@ public class LeaderPriorityBalancerTest { long currentTimeMillis = System.currentTimeMillis(); Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>(); for (int i = 0; i < 6; i++) { - nodeCacheMap.put(i, new DataNodeHeartbeatCache()); + nodeCacheMap.put(i, new DataNodeHeartbeatCache(i)); if (i != 2 && i != 5) { nodeCacheMap .get(i) diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java index b51cf10bcc..56f3e64d26 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java @@ -30,7 +30,7 @@ public class NodeCacheTest { @Test public void forceUpdateTest() { - DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(); + DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1); // Test default Assert.assertEquals(NodeStatus.Unknown, dataNodeHeartbeatCache.getNodeStatus()); @@ -55,7 +55,7 @@ public class NodeCacheTest { @Test public void periodicUpdateTest() { - DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(); + DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1); long currentTime = System.currentTimeMillis(); dataNodeHeartbeatCache.cacheHeartbeatSample( new NodeHeartbeatSample(
