This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch 
Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94bed63697747879440b91679454da330f4c8dd0
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Apr 13 22:05:14 2023 +0800

    ready for test
---
 .../iotdb/confignode/manager/ConfigManager.java    |  25 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  79 +++--
 .../confignode/manager/RetryFailedTasksThread.java |  21 +-
 .../iotdb/confignode/manager/load/LoadCache.java   | 357 ++++++++++++++++++---
 .../iotdb/confignode/manager/load/LoadManager.java | 183 +++++++++++
 .../manager/load/balancer/RegionBalancer.java      |   7 +-
 .../manager/load/balancer/RouteBalancer.java       |   7 +-
 .../manager/load/heartbeat/HeartbeatService.java   |   1 +
 .../manager/load/heartbeat/node/BaseNodeCache.java |   9 +-
 .../heartbeat/node/ConfigNodeHeartbeatCache.java   |  10 +-
 .../heartbeat/node/DataNodeHeartbeatCache.java     |   4 +-
 .../iotdb/confignode/manager/node/NodeManager.java | 149 +--------
 .../manager/partition/PartitionManager.java        | 146 +++------
 .../manager/partition/PartitionMetrics.java        |  18 +-
 .../manager/partition/RegionGroupStatus.java       |  25 +-
 .../confignode/persistence/node/NodeInfo.java      |  47 ++-
 .../partition/DatabasePartitionTable.java          |  36 ++-
 .../persistence/partition/PartitionInfo.java       |  50 ++-
 .../persistence/partition/RegionGroup.java         |  13 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  37 +--
 .../procedure/env/DataNodeRemoveHandler.java       |  15 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/statemachine/RegionMigrateProcedure.java  |   3 +
 .../router/priority/GreedyPriorityTest.java        |   2 +-
 .../priority/LeaderPriorityBalancerTest.java       |   2 +-
 .../confignode/manager/node/NodeCacheTest.java     |   4 +-
 26 files changed, 835 insertions(+), 417 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index dab5101d1c..4a3d65565e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -398,10 +398,11 @@ public class ConfigManager implements IManager {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target DataNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(dataNodeLocation.getDataNodeId())
-          
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.DataNode,
+              dataNodeLocation.getDataNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as 
Unknown",
           dataNodeLocation.getDataNodeId());
@@ -442,12 +443,7 @@ public class ConfigManager implements IManager {
               .map(TDataNodeConfiguration::getLocation)
               
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
               .collect(Collectors.toList());
-      Map<Integer, String> nodeStatus = new HashMap<>();
-      getNodeManager()
-          .getNodeCacheMap()
-          .forEach(
-              (nodeId, heartbeatCache) ->
-                  nodeStatus.put(nodeId, 
heartbeatCache.getNodeStatusWithReason()));
+      Map<Integer, String> nodeStatus = 
getLoadManager().getNodeStatusWithReason();
       return new TShowClusterResp(status, configNodeLocations, 
dataNodeInfoLocations, nodeStatus);
     } else {
       return new TShowClusterResp(status, new ArrayList<>(), new 
ArrayList<>(), new HashMap<>());
@@ -1146,10 +1142,11 @@ public class ConfigManager implements IManager {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target ConfigNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(configNodeLocation.getConfigNodeId())
-          
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.ConfigNode,
+              configNodeLocation.getConfigNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as 
Unknown",
           configNodeLocation.getConfigNodeId());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5df812d4fb..5379ff8ee1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
@@ -94,7 +93,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -347,13 +345,33 @@ public class ProcedureManager {
   }
 
   public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
-    // TODO: Whether to guarantee the check high consistency, i.e, use 
consensus read to check
-    Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap =
-        configManager.getPartitionManager().getRegionGroupCacheMap();
-    Optional<TConsensusGroupId> regionId =
-        regionReplicaMap.keySet().stream()
-            .filter(id -> id.getId() == migrateRegionReq.getRegionId())
-            .findAny();
+    TConsensusGroupId regionGroupId;
+    if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.SchemaRegion, 
migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 
migrateRegionReq.getRegionId());
+    } else if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.DataRegion, 
migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
migrateRegionReq.getRegionId());
+    } else {
+      LOGGER.warn(
+          "Submit RegionMigrateProcedure failed, because RegionGroup: {} 
doesn't exist",
+          migrateRegionReq.getRegionId());
+      TSStatus status = new 
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Submit RegionMigrateProcedure failed, because RegionGroup: %s 
doesn't exist",
+              migrateRegionReq.getRegionId()));
+      return status;
+    }
+
     TDataNodeLocation originalDataNode =
         configManager
             .getNodeManager()
@@ -364,18 +382,7 @@ public class ProcedureManager {
             .getNodeManager()
             .getRegisteredDataNode(migrateRegionReq.getToId())
             .getLocation();
-    if (!regionId.isPresent()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because no Region {}",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new 
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because no region Group "
-              + migrateRegionReq.getRegionId());
-      return status;
-    }
-    Set<Integer> dataNodesInRegion =
-        
regionReplicaMap.get(regionId.get()).getStatistics().getRegionStatisticsMap().keySet();
+
     if (originalDataNode == null) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because no original DataNode 
{}",
@@ -394,7 +401,9 @@ public class ProcedureManager {
           "Submit RegionMigrateProcedure failed, because no target DataNode "
               + migrateRegionReq.getToId());
       return status;
-    } else if (!dataNodesInRegion.contains(migrateRegionReq.getFromId())) {
+    } else if (configManager.getPartitionManager()
+        .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
+        .noneMatch(replicaSet -> 
replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the original DataNode 
{} doesn't contain Region {}",
           migrateRegionReq.getFromId(),
@@ -406,7 +415,9 @@ public class ProcedureManager {
               + " doesn't contain Region "
               + migrateRegionReq.getRegionId());
       return status;
-    } else if (dataNodesInRegion.contains(migrateRegionReq.getToId())) {
+    } else if 
(configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
+        .stream()
+        .anyMatch(replicaSet -> 
replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the target DataNode 
{} already contains Region {}",
           migrateRegionReq.getToId(),
@@ -426,10 +437,8 @@ public class ProcedureManager {
             .map(TDataNodeConfiguration::getLocation)
             .map(TDataNodeLocation::getDataNodeId)
             .collect(Collectors.toSet());
-    if (configManager
-        .getNodeManager()
-        .getNodeStatusByNodeId(migrateRegionReq.getFromId())
-        .equals(NodeStatus.Unknown)) {
+    if (NodeStatus.Unknown.equals(
+        
configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the sourceDataNode {} 
is Unknown.",
           migrateRegionReq.getFromId());
@@ -440,18 +449,8 @@ public class ProcedureManager {
               + " is Unknown.");
       return status;
     }
-    dataNodesInRegion.retainAll(aliveDataNodes);
-    if (dataNodesInRegion.isEmpty()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes 
in Region Group {} is unavailable.",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new 
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes 
in Region Group "
-              + migrateRegionReq.getRegionId()
-              + " are unavailable.");
-      return status;
-    } else if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
+
+    if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the destDataNode {} 
is ReadOnly or Unknown.",
           migrateRegionReq.getToId());
@@ -463,7 +462,7 @@ public class ProcedureManager {
       return status;
     }
     this.executor.submitProcedure(
-        new RegionMigrateProcedure(regionId.get(), originalDataNode, 
destDataNode));
+        new RegionMigrateProcedure(regionGroupId, originalDataNode, 
destDataNode));
     LOGGER.info(
         "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: 
{}",
         migrateRegionReq.getRegionId(),
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 865b8b59c2..287cda6eb1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -28,7 +28,7 @@ import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -63,6 +63,7 @@ public class RetryFailedTasksThread {
   private static final long HEARTBEAT_INTERVAL = 
CONF.getHeartbeatIntervalInMs();
   private final IManager configManager;
   private final NodeManager nodeManager;
+  private final LoadManager loadManager;
   private final ScheduledExecutorService retryFailTasksExecutor =
       
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-RetryFailedTasks-Service");
   private final Object scheduleMonitor = new Object();
@@ -78,6 +79,7 @@ public class RetryFailedTasksThread {
   public RetryFailedTasksThread(IManager configManager) {
     this.configManager = configManager;
     this.nodeManager = configManager.getNodeManager();
+    this.loadManager = configManager.getLoadManager();
     this.oldUnknownNodes = new HashSet<>();
   }
 
@@ -131,15 +133,12 @@ public class RetryFailedTasksThread {
         .forEach(
             DataNodeConfiguration -> {
               TDataNodeLocation dataNodeLocation = 
DataNodeConfiguration.getLocation();
-              BaseNodeCache newestNodeInformation =
-                  
nodeManager.getNodeCacheMap().get(dataNodeLocation.dataNodeId);
-              if (newestNodeInformation != null) {
-                if (newestNodeInformation.getNodeStatus() == 
NodeStatus.Running) {
-                  oldUnknownNodes.remove(dataNodeLocation);
-                } else if (!oldUnknownNodes.contains(dataNodeLocation)
-                    && newestNodeInformation.getNodeStatus() == 
NodeStatus.Unknown) {
-                  newUnknownNodes.add(dataNodeLocation);
-                }
+              NodeStatus nodeStatus = 
loadManager.getNodeStatus(dataNodeLocation.getDataNodeId());
+              if (nodeStatus == NodeStatus.Running) {
+                oldUnknownNodes.remove(dataNodeLocation);
+              } else if (!oldUnknownNodes.contains(dataNodeLocation)
+                  && nodeStatus == NodeStatus.Unknown) {
+                newUnknownNodes.add(dataNodeLocation);
               }
             });
 
@@ -163,7 +162,7 @@ public class RetryFailedTasksThread {
   private void syncDetectTask() {
     for (Map.Entry<Integer, Queue<TOperatePipeOnDataNodeReq>> entry : 
messageMap.entrySet()) {
       int dataNodeId = entry.getKey();
-      if 
(NodeStatus.Running.equals(nodeManager.getNodeStatusByNodeId(dataNodeId))) {
+      if (NodeStatus.Running.equals(loadManager.getNodeStatus(dataNodeId))) {
         final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
HashMap<>();
         dataNodeLocationMap.put(
             dataNodeId, 
nodeManager.getRegisteredDataNodeLocations().get(dataNodeId));
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
index 03f002899c..0daa3846f5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
@@ -21,9 +21,12 @@ package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
@@ -33,20 +36,21 @@ import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCach
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
-import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /** Maintain all kinds of heartbeat samples */
 public class LoadCache {
 
-  private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
-      ConsensusFactory.IOT_CONSENSUS.equals(
-          
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
-
   // Map<NodeId, INodeCache>
   private final Map<Integer, BaseNodeCache> nodeCacheMap;
   // Map<RegionGroupId, RegionGroupCache>
@@ -57,6 +61,58 @@ public class LoadCache {
     this.regionGroupCacheMap = new ConcurrentHashMap<>();
   }
 
+  public void initHeartbeatCache(IManager configManager) {
+    initNodeHeartbeatCache(
+        configManager.getNodeManager().getRegisteredConfigNodes(),
+        configManager.getNodeManager().getRegisteredDataNodes());
+    
initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  }
+
+  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
+  private void initNodeHeartbeatCache(
+      List<TConfigNodeLocation> registeredConfigNodes,
+      List<TDataNodeConfiguration> registeredDataNodes) {
+
+    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
+    nodeCacheMap.clear();
+
+    // Init ConfigNodeHeartbeatCache
+    registeredConfigNodes.forEach(
+        configNodeLocation -> {
+          int configNodeId = configNodeLocation.getConfigNodeId();
+          if (configNodeId != CURRENT_NODE_ID) {
+            nodeCacheMap.put(configNodeId, new 
ConfigNodeHeartbeatCache(configNodeId));
+          }
+        });
+    // Force set itself and never update
+    nodeCacheMap.put(
+        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+        new ConfigNodeHeartbeatCache(
+            CURRENT_NODE_ID, 
ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+
+    // Init DataNodeHeartbeatCache
+    registeredDataNodes.forEach(
+        dataNodeConfiguration -> {
+          int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+          nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
+        });
+  }
+
+  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is 
switched. */
+  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> 
regionReplicaSets) {
+    regionGroupCacheMap.clear();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionGroupCacheMap.put(
+                regionReplicaSet.getRegionId(),
+                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  }
+
+  public void clearHeartbeatCache() {
+    nodeCacheMap.clear();
+    regionGroupCacheMap.clear();
+  }
+
   /**
    * Cache the latest heartbeat sample of a ConfigNode.
    *
@@ -77,7 +133,7 @@ public class LoadCache {
    */
   public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample 
sample) {
     nodeCacheMap
-        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache())
+        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
         .cacheHeartbeatSample(sample);
   }
 
@@ -133,54 +189,263 @@ public class LoadCache {
     return differentRegionGroupStatisticsMap;
   }
 
-  public void initHeartbeatCache(IManager configManager) {
-    initNodeHeartbeatCache(
-        configManager.getNodeManager().getRegisteredConfigNodes(),
-        configManager.getNodeManager().getRegisteredDataNodes());
-    
initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  /**
+   * Safely get NodeStatus by NodeId
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
   }
 
-  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
-  private void initNodeHeartbeatCache(
-      List<TConfigNodeLocation> registeredConfigNodes,
-      List<TDataNodeConfiguration> registeredDataNodes) {
-    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
-    nodeCacheMap.clear();
+  /**
+   * Safely get the specified Node's current status with reason
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, 
Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null
+        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
+        : nodeCache.getNodeStatusWithReason();
+  }
 
-    // Init ConfigNodeHeartbeatCache
-    registeredConfigNodes.forEach(
-        configNodeLocation -> {
-          if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
-            nodeCacheMap.put(
-                configNodeLocation.getConfigNodeId(),
-                new 
ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
+  /**
+   * Get all Node's current status with reason
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return nodeCacheMap.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getNodeStatusWithReason()));
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof ConfigNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> 
s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> 
s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no 
heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    DataNodeHeartbeatCache dataNodeHeartbeatCache =
+        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
+    return dataNodeHeartbeatCache == null ? 0d : 
dataNodeHeartbeatCache.getFreeDiskSpace();
+  }
+
+  /**
+   * Get the loadScore of each DataNode
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    Map<Integer, Long> result = new ConcurrentHashMap<>();
+    nodeCacheMap.forEach(
+        (dataNodeId, heartbeatCache) -> {
+          if (heartbeatCache instanceof DataNodeHeartbeatCache) {
+            result.put(dataNodeId, heartbeatCache.getLoadScore());
           }
         });
-    // Force set itself and never update
-    nodeCacheMap.put(
-        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
-        new ConfigNodeHeartbeatCache(
-            CURRENT_NODE_ID, 
ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+    return result;
+  }
 
-    // Init DataNodeHeartbeatCache
-    registeredDataNodes.forEach(
-        dataNodeConfiguration ->
-            nodeCacheMap.put(
-                dataNodeConfiguration.getLocation().getDataNodeId(), new 
DataNodeHeartbeatCache()));
+  /**
+   * Get the lowest loadScore DataNode
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode 
heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return nodeCacheMap.entrySet().stream()
+        .filter(nodeCacheEntry -> nodeCacheEntry.getValue() instanceof 
DataNodeHeartbeatCache)
+        .min(Comparator.comparingLong(nodeCacheEntry -> 
nodeCacheEntry.getValue().getLoadScore()))
+        .map(Map.Entry::getKey)
+        .orElse(-1);
   }
 
-  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is 
switched. */
-  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> 
regionReplicaSets) {
-    regionGroupCacheMap.clear();
-    regionReplicaSets.forEach(
-        regionReplicaSet ->
-            regionGroupCacheMap.put(
-                regionReplicaSet.getRegionId(),
-                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode 
heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return dataNodeIds.stream()
+        .map(nodeCacheMap::get)
+        .filter(Objects::nonNull)
+        .min(Comparator.comparingLong(BaseNodeCache::getLoadScore))
+        .map(BaseNodeCache::getNodeId)
+        .orElse(-1);
   }
 
-  public void clearHeartbeatCache() {
-    nodeCacheMap.clear();
-    regionGroupCacheMap.clear();
+  /**
+   * Force update the specified Node's cache
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    switch (nodeType) {
+      case ConfigNode:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new 
ConfigNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+      case DataNode:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new 
DataNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+    }
+  }
+
+  /** Remove the specified Node's cache */
+  public void removeNodeCache(int nodeId) {
+    nodeCacheMap.remove(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int 
dataNodeId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? 
regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
+        : RegionStatus.Unknown;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled 
otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId 
consensusGroupId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? 
regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
+        : RegionGroupStatus.Disabled;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled 
otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new 
ConcurrentHashMap<>();
+    for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+      regionGroupStatusMap.put(consensusGroupId, 
getRegionGroupStatus(consensusGroupId));
+    }
+    return regionGroupStatusMap;
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> 
filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return regionGroupCacheMap.entrySet().stream()
+        .filter(
+            regionGroupCacheEntry ->
+                Arrays.stream(status)
+                    .anyMatch(
+                        s ->
+                            s.equals(
+                                regionGroupCacheEntry
+                                    .getValue()
+                                    .getStatistics()
+                                    .getRegionGroupStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, 
RegionStatus... status) {
+    AtomicInteger result = new AtomicInteger(0);
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (type.equals(regionGroupId.getType())) {
+            regionGroupCache
+                .getStatistics()
+                .getRegionStatisticsMap()
+                .values()
+                .forEach(
+                    regionStatistics -> {
+                      if (Arrays.stream(status)
+                          .anyMatch(s -> 
s.equals(regionStatistics.getRegionStatus()))) {
+                        result.getAndIncrement();
+                      }
+                    });
+          }
+        });
+    return result.get();
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> 
heartbeatSampleMap) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new 
RegionGroupCache(regionGroupId))
+        .forceUpdate(heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    regionGroupCacheMap.remove(consensusGroupId);
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 15dc22c0f5..f66b805703 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -34,7 +37,10 @@ import 
org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -162,6 +168,10 @@ public class LoadManager {
     return routeBalancer.getLatestRegionPriorityMap();
   }
 
+  public void broadcastLatestRegionRouteMap() {
+    statisticsService.broadcastLatestRegionRouteMap();
+  }
+
   public void startLoadServices() {
     loadCache.initHeartbeatCache(configManager);
     routeBalancer.initRegionRouteMap();
@@ -178,4 +188,177 @@ public class LoadManager {
   public RouteBalancer getRouteBalancer() {
     return routeBalancer;
   }
+
+  /**
+   * Safely get NodeStatus by NodeId
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    return loadCache.getNodeStatus(nodeId);
+  }
+
+  /**
+   * Safely get the specified Node's current status with reason
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, 
Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    return loadCache.getNodeStatusWithReason(nodeId);
+  }
+
+  /**
+   * Get all Node's current status with reason
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return loadCache.getNodeStatusWithReason();
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterConfigNodeThroughStatus(status);
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterDataNodeThroughStatus(status);
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no 
heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    return loadCache.getFreeDiskSpace(dataNodeId);
+  }
+
+  /**
+   * Get the loadScore of each DataNode
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    return loadCache.getAllDataNodeLoadScores();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode 
heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return loadCache.getLowestLoadDataNode();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode 
heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return loadCache.getLowestLoadDataNode(dataNodeIds);
+  }
+
+  /**
+   * Force update the specified Node's cache
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
+  }
+
+  /** Remove the specified Node's cache */
+  public void removeNodeCache(int nodeId) {
+    loadCache.removeNodeCache(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int 
dataNodeId) {
+    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled 
otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId 
consensusGroupId) {
+    return loadCache.getRegionGroupStatus(consensusGroupId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled 
otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    return loadCache.getRegionGroupStatus(consensusGroupIds);
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> 
filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return loadCache.filterRegionGroupThroughStatus(status);
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, 
RegionStatus... status) {
+    return loadCache.countRegionWithSpecifiedStatus(type, status);
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> 
heartbeatSampleMap) {
+    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    loadCache.removeRegionGroupCache(consensusGroupId);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5eeebe..f6d1524df0 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
 import 
org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
@@ -111,7 +112,7 @@ public class RegionBalancer {
             dataNodeConfiguration -> {
               int dataNodeId = 
dataNodeConfiguration.getLocation().getDataNodeId();
               availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
-              freeDiskSpaceMap.put(dataNodeId, 
getNodeManager().getFreeDiskSpace(dataNodeId));
+              freeDiskSpaceMap.put(dataNodeId, 
getLoadManager().getFreeDiskSpace(dataNodeId));
             });
 
         // Generate allocation plan
@@ -145,6 +146,10 @@ public class RegionBalancer {
     return configManager.getPartitionManager();
   }
 
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
+
   public enum RegionGroupAllocatePolicy {
     COPY_SET,
     GREEDY
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 4a27368459..a87cb051ba 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
@@ -193,7 +194,7 @@ public class RouteBalancer {
 
   private boolean updateRegionPriorityMap() {
     Map<TConsensusGroupId, Integer> regionLeaderMap = 
regionRouteMap.getRegionLeaderMap();
-    Map<Integer, Long> dataNodeLoadScoreMap = 
getNodeManager().getAllLoadScores();
+    Map<Integer, Long> dataNodeLoadScoreMap = 
getLoadManager().getAllDataNodeLoadScores();
 
     // Balancing region priority in each SchemaRegionGroup
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
@@ -412,4 +413,8 @@ public class RouteBalancer {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
index b2aaaa15aa..1ad268c5bd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
@@ -171,6 +171,7 @@ public class HeartbeatService {
           new DataNodeHeartbeatHandler(
               dataNodeInfo.getLocation().getDataNodeId(),
               loadCache,
+              configManager.getLoadManager().getRouteBalancer(),
               configManager.getClusterQuotaManager().getDeviceNum(),
               configManager.getClusterQuotaManager().getTimeSeriesNum(),
               configManager.getClusterQuotaManager().getRegionDisk());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
index 392ae24b6e..d55bf144d9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
@@ -32,6 +32,8 @@ public abstract class BaseNodeCache {
   // Max heartbeat cache samples store size
   public static final int MAXIMUM_WINDOW_SIZE = 100;
 
+  protected final int nodeId;
+
   // SlidingWindow stores the heartbeat sample data
   protected final LinkedList<NodeHeartbeatSample> slidingWindow = new 
LinkedList<>();
 
@@ -42,7 +44,8 @@ public abstract class BaseNodeCache {
   protected volatile NodeStatistics currentStatistics;
 
   /** Constructor for NodeCache with default NodeStatistics */
-  protected BaseNodeCache() {
+  protected BaseNodeCache(int nodeId) {
+    this.nodeId = nodeId;
     this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics();
     this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics();
   }
@@ -108,6 +111,10 @@ public abstract class BaseNodeCache {
    */
   protected abstract void updateCurrentStatistics();
 
+  public int getNodeId() {
+    return nodeId;
+  }
+
   /**
    * TODO: The loadScore of each Node will be changed to Double
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
index afd9d3e197..0bb831c0dd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
@@ -31,18 +31,14 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache 
{
   public static final NodeStatistics CURRENT_NODE_STATISTICS =
       new NodeStatistics(0, NodeStatus.Running, null);
 
-  private final int configNodeId;
-
   /** Constructor for create ConfigNodeHeartbeatCache with default 
NodeStatistics */
   public ConfigNodeHeartbeatCache(int configNodeId) {
-    super();
-    this.configNodeId = configNodeId;
+    super(configNodeId);
   }
 
   /** Constructor only for ConfigNode-leader */
   public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) 
{
-    super();
-    this.configNodeId = configNodeId;
+    super(configNodeId);
     this.previousStatistics = statistics;
     this.currentStatistics = statistics;
   }
@@ -50,7 +46,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
   @Override
   protected void updateCurrentStatistics() {
     // Skip itself
-    if (configNodeId == CURRENT_NODE_ID) {
+    if (nodeId == CURRENT_NODE_ID) {
       return;
     }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
index cb20836120..aa36677983 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
@@ -28,8 +28,8 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
   private volatile TLoadSample latestLoadSample;
 
   /** Constructor for create DataNodeHeartbeatCache with default 
NodeStatistics */
-  public DataNodeHeartbeatCache() {
-    super();
+  public DataNodeHeartbeatCache(int dataNodeId) {
+    super(dataNodeId);
     this.latestLoadSample = new TLoadSample();
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 60e0134320..1482231c87 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -46,7 +46,6 @@ import 
org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.IManager;
@@ -54,9 +53,7 @@ import org.apache.iotdb.confignode.manager.TriggerManager;
 import org.apache.iotdb.confignode.manager.UDFManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
-import 
org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -82,19 +79,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
@@ -109,13 +102,10 @@ public class NodeManager {
 
   private final ReentrantLock removeConfigNodeLock;
 
-  private final Random random;
-
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
-    this.random = new Random(System.currentTimeMillis());
   }
 
   /**
@@ -365,15 +355,6 @@ public class NodeManager {
     return nodeInfo.getRegisteredDataNodeCount();
   }
 
-  /**
-   * Only leader use this interface
-   *
-   * @return The number of total cpu cores in online DataNodes
-   */
-  public int getTotalCpuCoreCount() {
-    return nodeInfo.getTotalCpuCoreCount();
-  }
-
   /**
    * Only leader use this interface
    *
@@ -417,7 +398,7 @@ public class NodeManager {
             TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
             int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
             dataNodeInfo.setDataNodeId(dataNodeId);
-            dataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId));
+            
dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
             dataNodeInfo.setRpcAddresss(
                 
registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
             dataNodeInfo.setRpcPort(
@@ -481,7 +462,7 @@ public class NodeManager {
             TConfigNodeInfo info = new TConfigNodeInfo();
             int configNodeId = configNodeLocation.getConfigNodeId();
             info.setConfigNodeId(configNodeId);
-            info.setStatus(getNodeStatusWithReason(configNodeId));
+            
info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
             
info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
             
info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
             info.setRoleType(
@@ -666,98 +647,25 @@ public class NodeManager {
     }
   }
 
-  public Map<Integer, BaseNodeCache> getNodeCacheMap() {
-    return nodeCacheMap;
-  }
-
-  public void removeNodeCache(int nodeId) {
-    nodeCacheMap.remove(nodeId);
-  }
-
-  /**
-   * Safely get the specific Node's current status for showing cluster
-   *
-   * @param nodeId The specific Node's index
-   * @return The specific Node's current status if the nodeCache contains it, 
Unknown otherwise
-   */
-  private String getNodeStatusWithReason(int nodeId) {
-    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
-    return nodeCache == null
-        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
-        : nodeCache.getNodeStatusWithReason();
-  }
-
   /**
-   * Filter the registered ConfigNodes through the specific NodeStatus
+   * Filter ConfigNodes through the specified NodeStatus
    *
-   * @param status The specific NodeStatus
-   * @return Filtered ConfigNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
    */
   public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... 
status) {
-    return getRegisteredConfigNodes().stream()
-        .filter(
-            registeredConfigNode -> {
-              int configNodeId = registeredConfigNode.getConfigNodeId();
-              return nodeCacheMap.containsKey(configNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> 
s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get NodeStatus by nodeId
-   *
-   * @param nodeId The specific NodeId
-   * @return NodeStatus of the specific node. If node does not exist, return 
null.
-   */
-  public NodeStatus getNodeStatusByNodeId(int nodeId) {
-    BaseNodeCache baseNodeCache = nodeCacheMap.get(nodeId);
-    return baseNodeCache == null ? null : baseNodeCache.getNodeStatus();
+    return nodeInfo.getRegisteredConfigNodes(
+        getLoadManager().filterConfigNodeThroughStatus(status));
   }
 
   /**
-   * Filter the registered DataNodes through the specific NodeStatus
+   * Filter DataNodes through the specified NodeStatus
    *
-   * @param status The specific NodeStatus
-   * @return Filtered DataNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
    */
   public List<TDataNodeConfiguration> 
filterDataNodeThroughStatus(NodeStatus... status) {
-    return getRegisteredDataNodes().stream()
-        .filter(
-            registeredDataNode -> {
-              int dataNodeId = 
registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(dataNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> 
s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get the loadScore of each DataNode
-   *
-   * @return Map<DataNodeId, loadScore>
-   */
-  public Map<Integer, Long> getAllLoadScores() {
-    Map<Integer, Long> result = new ConcurrentHashMap<>();
-
-    nodeCacheMap.forEach(
-        (dataNodeId, heartbeatCache) -> result.put(dataNodeId, 
heartbeatCache.getLoadScore()));
-
-    return result;
-  }
-
-  /**
-   * Get the free disk space of the specified DataNode
-   *
-   * @param dataNodeId The index of the specified DataNode
-   * @return The free disk space that sample through heartbeat, 0 if no 
heartbeat received
-   */
-  public double getFreeDiskSpace(int dataNodeId) {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache =
-        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
-    return dataNodeHeartbeatCache == null ? 0d : 
dataNodeHeartbeatCache.getFreeDiskSpace();
+    return 
nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
   }
 
   /**
@@ -767,15 +675,10 @@ public class NodeManager {
    */
   public Optional<TDataNodeLocation> getLowestLoadDataNode() {
     // TODO get real lowest load data node after scoring algorithm being 
implemented
-    List<TDataNodeConfiguration> targetDataNodeList =
-        filterDataNodeThroughStatus(NodeStatus.Running);
-
-    if (targetDataNodeList == null || targetDataNodeList.isEmpty()) {
-      return Optional.empty();
-    } else {
-      int index = random.nextInt(targetDataNodeList.size());
-      return Optional.of(targetDataNodeList.get(index).location);
-    }
+    int dataNodeId = getLoadManager().getLowestLoadDataNode();
+    return dataNodeId < 0
+        ? Optional.empty()
+        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
   }
 
   /**
@@ -784,22 +687,8 @@ public class NodeManager {
    * @return TDataNodeLocation with the lowest loadScore
    */
   public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
-    AtomicInteger result = new AtomicInteger();
-    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
-
-    nodes.forEach(
-        nodeID -> {
-          BaseNodeCache cache = nodeCacheMap.get(nodeID);
-          long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
-          if (score < lowestLoadScore.get()) {
-            result.set(nodeID);
-            lowestLoadScore.set(score);
-          }
-        });
-
-    LOGGER.info(
-        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, 
lowestLoadScore);
-    return 
configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+    int dataNodeId = getLoadManager().getLowestLoadDataNode(new 
ArrayList<>(nodes));
+    return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
   private ConsensusManager getConsensusManager() {
@@ -829,8 +718,4 @@ public class NodeManager {
   private UDFManager getUDFManager() {
     return configManager.getUDFManager();
   }
-
-  private ClusterQuotaManager getClusterQuotaManager() {
-    return configManager.getClusterQuotaManager();
-  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index fdec1487e6..f15978d6bd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -87,7 +86,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -100,7 +98,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /** The PartitionManager Manages cluster PartitionTable read and write 
requests. */
@@ -588,12 +585,34 @@ public class PartitionManager {
    * Only leader use this interface.
    *
    * @param database The specified Database
-   * @return All Regions' RegionReplicaSet of the specified StorageGroup
+   * @return All Regions' RegionReplicaSet of the specified Database
    */
   public List<TRegionReplicaSet> getAllReplicaSets(String database) {
     return partitionInfo.getAllReplicaSets(database);
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the 
specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return partitionInfo.getAllReplicaSets(dataNodeId);
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    return partitionInfo.getReplicaSets(database, regionGroupIds);
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -666,22 +685,22 @@ public class PartitionManager {
   /**
    * Only leader use this interface.
    *
-   * @param storageGroup StorageGroupName
+   * @param database DatabaseName
    * @param type SchemaRegion or DataRegion
    * @return The specific StorageGroup's Regions that sorted by the number of 
allocated slots
    * @throws NoAvailableRegionGroupException When all RegionGroups within the 
specified StorageGroup
    *     are unavailable currently
    */
   public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
-      String storageGroup, TConsensusGroupType type) throws 
NoAvailableRegionGroupException {
+      String database, TConsensusGroupType type) throws 
NoAvailableRegionGroupException {
     // Collect static data
     List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
-        partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+        partitionInfo.getRegionGroupSlotsCounter(database, type);
 
     // Filter RegionGroups that have Disabled status
     List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
     for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) 
{
-      RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+      RegionGroupStatus status = 
getLoadManager().getRegionGroupStatus(slotsCounter.getRight());
       if (!RegionGroupStatus.Disabled.equals(status)) {
         result.add(slotsCounter);
       }
@@ -691,6 +710,9 @@ public class PartitionManager {
       throw new NoAvailableRegionGroupException(type);
     }
 
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap =
+        getLoadManager()
+            
.getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList()));
     result.sort(
         (o1, o2) -> {
           // Use the number of partitions as the first priority
@@ -700,8 +722,9 @@ public class PartitionManager {
             return 1;
           } else {
             // Use RegionGroup status as second priority, Running > Available 
> Discouraged
-            return getRegionGroupStatus(o1.getRight())
-                .compareTo(getRegionGroupStatus(o2.getRight()));
+            return regionGroupStatusMap
+                .get(o1.getRight())
+                .compare(regionGroupStatusMap.get(o2.getRight()));
           }
         });
     return result;
@@ -759,7 +782,8 @@ public class PartitionManager {
         .forEach(
             regionInfo -> {
               regionInfo.setStatus(
-                  getRegionStatus(regionInfo.getConsensusGroupId(), 
regionInfo.getDataNodeId())
+                  getLoadManager()
+                      .getRegionStatus(regionInfo.getConsensusGroupId(), 
regionInfo.getDataNodeId())
                       .getStatus());
 
               String regionType =
@@ -773,6 +797,15 @@ public class PartitionManager {
     return regionInfoListResp;
   }
 
+  /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) {
+    return partitionInfo.isRegionGroupExisted(regionGroupId);
+  }
+
   /**
    * update region location
    *
@@ -780,13 +813,6 @@ public class PartitionManager {
    * @return TSStatus
    */
   public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
-    // Remove heartbeat cache if exists
-    if (regionGroupCacheMap.containsKey(req.getRegionId())) {
-      regionGroupCacheMap
-          .get(req.getRegionId())
-          .removeCacheIfExists(req.getOldNode().getDataNodeId());
-    }
-
     return getConsensusManager().write(req).getStatus();
   }
 
@@ -1059,93 +1085,23 @@ public class PartitionManager {
         /* Stop the RegionCleaner service */
         currentRegionMaintainerFuture.cancel(false);
         currentRegionMaintainerFuture = null;
-        regionGroupCacheMap.clear();
         LOGGER.info("RegionCleaner is stopped successfully.");
       }
     }
   }
 
-  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
-    regionGroupCacheMap.remove(consensusGroupId);
-  }
-
   /**
-   * Filter the RegionGroups in the specified StorageGroup through the 
RegionGroupStatus
+   * Filter the RegionGroups in the specified Database through the 
RegionGroupStatus
    *
-   * @param storageGroup The specified StorageGroup
+   * @param database The specified Database
    * @param status The specified RegionGroupStatus
-   * @return Filtered RegionGroups with the specific RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
    */
   public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
-      String storageGroup, RegionGroupStatus... status) {
-    return getAllReplicaSets(storageGroup).stream()
-        .filter(
-            regionReplicaSet -> {
-              TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
-              return regionGroupCacheMap.containsKey(regionGroupId)
-                  && Arrays.stream(status)
-                      .anyMatch(
-                          s ->
-                              s.equals(
-                                  regionGroupCacheMap
-                                      .get(regionGroupId)
-                                      .getStatistics()
-                                      .getRegionGroupStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Count the number of cluster Regions with specified RegionStatus
-   *
-   * @param type The specified RegionGroupType
-   * @param status The specified statues
-   * @return The number of cluster Regions with specified RegionStatus
-   */
-  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, 
RegionStatus... status) {
-    AtomicInteger result = new AtomicInteger(0);
-    regionGroupCacheMap.forEach(
-        (regionGroupId, regionGroupCache) -> {
-          if (type.equals(regionGroupId.getType())) {
-            regionGroupCache
-                .getStatistics()
-                .getRegionStatisticsMap()
-                .values()
-                .forEach(
-                    regionStatistics -> {
-                      if (Arrays.stream(status)
-                          .anyMatch(s -> 
s.equals(regionStatistics.getRegionStatus()))) {
-                        result.getAndIncrement();
-                      }
-                    });
-          }
-        });
-    return result.get();
-  }
-
-  /**
-   * Safely get RegionStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @param dataNodeId Specified RegionReplicaId
-   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
-   */
-  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int 
dataNodeId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? 
regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
-        : RegionStatus.Unknown;
-  }
-
-  /**
-   * Safely get RegionGroupStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @return Corresponding RegionGroupStatus if cache exists, Disabled 
otherwise
-   */
-  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId 
consensusGroupId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? 
regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
-        : RegionGroupStatus.Disabled;
+      String database, RegionGroupStatus... status) {
+    List<TConsensusGroupId> matchedRegionGroups =
+        getLoadManager().filterRegionGroupThroughStatus(status);
+    return getReplicaSets(database, matchedRegionGroups);
   }
 
   public void getSchemaRegionIds(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6a5a..f9eff7b0de 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -72,10 +72,9 @@ public class PartitionMetrics implements IMetricSet {
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.SchemaRegion, status),
+          getLoadManager(),
+          loadManager ->
+              
loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.SchemaRegion, 
status),
           Tag.TYPE.toString(),
           TConsensusGroupType.SchemaRegion.toString(),
           Tag.STATUS.toString(),
@@ -85,10 +84,9 @@ public class PartitionMetrics implements IMetricSet {
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.DataRegion, status),
+          getLoadManager(),
+          loadManager ->
+              
loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.DataRegion, 
status),
           Tag.TYPE.toString(),
           TConsensusGroupType.DataRegion.toString(),
           Tag.STATUS.toString(),
@@ -330,8 +328,8 @@ public class PartitionMetrics implements IMetricSet {
     return configManager.getClusterSchemaManager();
   }
 
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
index 9e58ae6d1c..3cf9976c3d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -21,20 +21,20 @@ package org.apache.iotdb.confignode.manager.partition;
 public enum RegionGroupStatus {
 
   /** All Regions in RegionGroup are in the Running status */
-  Running("Running"),
+  Running("Running", 1),
 
   /**
    * All Regions in RegionGroup are in the Running or Unknown status, and the 
number of Regions in
    * the Unknown status is less than half
    */
-  Available("Available"),
+  Available("Available", 2),
 
   /**
    * All Regions in RegionGroup are in the Running, Unknown or ReadOnly 
status, and at least 1 node
    * is in ReadOnly status, the number of Regions in the Unknown or ReadOnly 
status is less than
    * half
    */
-  Discouraged("Discouraged"),
+  Discouraged("Discouraged", 3),
 
   /**
    * The following cases will lead to Disabled RegionGroup:
@@ -43,18 +43,24 @@ public enum RegionGroupStatus {
    *
    * <p>2. More than half of the Regions are in Unknown or ReadOnly status
    */
-  Disabled("Disabled");
+  Disabled("Disabled", 4);
 
   private final String status;
+  private final int weight;
 
-  RegionGroupStatus(String status) {
+  RegionGroupStatus(String status, int weight) {
     this.status = status;
+    this.weight = weight;
   }
 
   public String getStatus() {
     return status;
   }
 
+  public int getWeight() {
+    return weight;
+  }
+
   public static RegionGroupStatus parse(String status) {
     for (RegionGroupStatus regionGroupStatus : RegionGroupStatus.values()) {
       if (regionGroupStatus.status.equals(status)) {
@@ -63,4 +69,13 @@ public enum RegionGroupStatus {
     }
     throw new RuntimeException(String.format("RegionGroupStatus %s doesn't 
exist.", status));
   }
+
+  /**
+   * Compare the weight of two RegionGroupStatus
+   *
+   * <p>Running > Available > Discouraged > Disabled
+   */
+  public int compare(RegionGroupStatus other) {
+    return Integer.compare(this.weight, other.weight);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 67c5526ff8..4c0b1761cc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -221,18 +221,6 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
-  /** Return the number of registered ConfigNodes */
-  public int getRegisteredConfigNodeCount() {
-    int result;
-    configNodeInfoReadWriteLock.readLock().lock();
-    try {
-      result = registeredConfigNodes.size();
-    } finally {
-      configNodeInfoReadWriteLock.readLock().unlock();
-    }
-    return result;
-  }
-
   /** Return the number of total cpu cores in online DataNodes */
   public int getTotalCpuCoreCount() {
     int result = 0;
@@ -269,6 +257,23 @@ public class NodeInfo implements SnapshotProcessor {
     }
   }
 
+  /** @return The specified registered DataNodes */
+  public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> 
dataNodeIds) {
+    List<TDataNodeConfiguration> result = new ArrayList<>();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      dataNodeIds.forEach(
+          dataNodeId -> {
+            if (registeredDataNodes.containsKey(dataNodeId)) {
+              result.add(registeredDataNodes.get(dataNodeId).deepCopy());
+            }
+          });
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   /**
    * Update ConfigNodeList both in memory and confignode-system.properties file
    *
@@ -336,6 +341,7 @@ public class NodeInfo implements SnapshotProcessor {
     return status;
   }
 
+  /** @return All registered ConfigNodes */
   public List<TConfigNodeLocation> getRegisteredConfigNodes() {
     List<TConfigNodeLocation> result;
     configNodeInfoReadWriteLock.readLock().lock();
@@ -347,6 +353,23 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /** @return The specified registered ConfigNode */
+  public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> 
configNodeIds) {
+    List<TConfigNodeLocation> result = new ArrayList<>();
+    configNodeInfoReadWriteLock.readLock().lock();
+    try {
+      configNodeIds.forEach(
+          configNodeId -> {
+            if (registeredConfigNodes.containsKey(configNodeId)) {
+              result.add(registeredConfigNodes.get(configNodeId).deepCopy());
+            }
+          });
+    } finally {
+      configNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   public int generateNextNodeId() {
     return nextNodeId.incrementAndGet();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b27a4b529f..4db9a39eb0 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -119,8 +119,9 @@ public class DatabasePartitionTable {
 
     return result;
   }
+
   /**
-   * Get all RegionGroups currently owned by this StorageGroup
+   * Get all RegionGroups currently owned by this Database
    *
    * @param type The specified TConsensusGroupType
    * @return Deep copy of all Regions' RegionReplicaSet with the specified 
TConsensusGroupType
@@ -137,6 +138,37 @@ public class DatabasePartitionTable {
     return result;
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the 
specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return regionGroupMap.values().stream()
+        .filter(regionGroup -> regionGroup.belongsToDataNode(dataNodeId))
+        .map(RegionGroup::getReplicaSet)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the RegionGroups with the specified RegionGroupIds
+   *
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return Deep copy of the RegionGroups with the specified RegionGroupIds
+   */
+  public List<TRegionReplicaSet> getReplicaSets(List<TConsensusGroupId> 
regionGroupIds) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+
+    for (TConsensusGroupId regionGroupId : regionGroupIds) {
+      if (regionGroupMap.containsKey(regionGroupId)) {
+        result.add(regionGroupMap.get(regionGroupId).getReplicaSet());
+      }
+    }
+
+    return result;
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -496,7 +528,7 @@ public class DatabasePartitionTable {
    * @param regionId TConsensusGroupId
    * @return True if contains.
    */
-  public boolean containRegion(TConsensusGroupId regionId) {
+  public boolean containRegionGroup(TConsensusGroupId regionId) {
     return regionGroupMap.containsKey(regionId);
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index b19ac834b3..062500012e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -488,6 +488,17 @@ public class PartitionInfo implements SnapshotProcessor {
     return regionResp;
   }
 
+  /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExisted(TConsensusGroupId regionGroupId) {
+    return databasePartitionTables.values().stream()
+        .anyMatch(
+            databasePartitionTable -> 
databasePartitionTable.containRegionGroup(regionGroupId));
+  }
+
   /**
    * Update the location info of given regionId
    *
@@ -500,9 +511,10 @@ public class PartitionInfo implements SnapshotProcessor {
     TDataNodeLocation oldNode = req.getOldNode();
     TDataNodeLocation newNode = req.getNewNode();
     databasePartitionTables.values().stream()
-        .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+        .filter(databasePartitionTable -> 
databasePartitionTable.containRegionGroup(regionId))
         .forEach(
-            sgPartitionTable -> 
sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
+            databasePartitionTable ->
+                databasePartitionTable.updateRegionLocation(regionId, oldNode, 
newNode));
 
     return status;
   }
@@ -516,7 +528,7 @@ public class PartitionInfo implements SnapshotProcessor {
   public String getRegionStorageGroup(TConsensusGroupId regionId) {
     Optional<DatabasePartitionTable> sgPartitionTableOptional =
         databasePartitionTables.values().stream()
-            .filter(s -> s.containRegion(regionId))
+            .filter(s -> s.containRegionGroup(regionId))
             .findFirst();
     return 
sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null);
   }
@@ -619,6 +631,38 @@ public class PartitionInfo implements SnapshotProcessor {
     }
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the 
specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+    databasePartitionTables
+        .values()
+        .forEach(
+            databasePartitionTable ->
+                
result.addAll(databasePartitionTable.getAllReplicaSets(dataNodeId)));
+    return result;
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    if (databasePartitionTables.containsKey(database)) {
+      return 
databasePartitionTables.get(database).getReplicaSets(regionGroupIds);
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
   /**
    * Only leader use this interface.
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 4c4c7ffbc9..a555c2349c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -71,7 +71,7 @@ public class RegionGroup {
   }
 
   public TRegionReplicaSet getReplicaSet() {
-    return replicaSet;
+    return replicaSet.deepCopy();
   }
 
   /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot 
Count> */
@@ -93,6 +93,17 @@ public class RegionGroup {
     return totalTimeSlotCount.get();
   }
 
+  /**
+   * Check if the RegionGroup belongs to the specified DataNode.
+   *
+   * @param dataNodeId The specified DataNodeId.
+   * @return True if the RegionGroup belongs to the specified DataNode.
+   */
+  public boolean belongsToDataNode(int dataNodeId) {
+    return replicaSet.getDataNodeLocations().stream()
+        .anyMatch(dataNodeLocation -> dataNodeLocation.getDataNodeId() == 
dataNodeId);
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(createTime, outputStream);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 06aef7c2f6..8b925535bf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -49,7 +50,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
-import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
@@ -145,8 +145,7 @@ public class ConfigNodeProcedureEnv {
    * @throws TException Thrift IOE
    */
   public boolean invalidateCache(String storageGroupName) throws IOException, 
TException {
-    NodeManager nodeManager = configManager.getNodeManager();
-    List<TDataNodeConfiguration> allDataNodes = 
nodeManager.getRegisteredDataNodes();
+    List<TDataNodeConfiguration> allDataNodes = 
getNodeManager().getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
@@ -154,14 +153,14 @@ public class ConfigNodeProcedureEnv {
       int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
 
       // if the node is not alive, sleep 1 second and try again
-      NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+      NodeStatus nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       if (nodeStatus == NodeStatus.Unknown) {
         try {
           TimeUnit.MILLISECONDS.sleep(1000);
         } catch (InterruptedException e) {
           LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
         }
-        nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+        nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       }
 
       if (nodeStatus == NodeStatus.Running) {
@@ -202,14 +201,11 @@ public class ConfigNodeProcedureEnv {
   }
 
   public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
-    return configManager
-                .getNodeManager()
+    return getNodeManager()
                 .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.ReadOnly)
                 .size()
             - Boolean.compare(
-                configManager
-                        .getNodeManager()
-                        .getNodeStatusByNodeId(removedDatanode.getDataNodeId())
+                getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
                     != NodeStatus.Unknown,
                 false)
         >= NodeInfo.getMinimumDataNode();
@@ -305,8 +301,6 @@ public class ConfigNodeProcedureEnv {
    * @throws ProcedureException if failed status
    */
   public void stopConfigNode(TConfigNodeLocation tConfigNodeLocation) throws 
ProcedureException {
-    getNodeManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
-
     TSStatus tsStatus =
         (TSStatus)
             SyncConfigNodeClientPool.getInstance()
@@ -318,6 +312,8 @@ public class ConfigNodeProcedureEnv {
     if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new ProcedureException(tsStatus.getMessage());
     }
+
+    getLoadManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
   }
 
   /**
@@ -371,8 +367,7 @@ public class ConfigNodeProcedureEnv {
    */
   public void markDataNodeAsRemovingAndBroadcast(TDataNodeLocation 
dataNodeLocation) {
     // Send request to update NodeStatus on the DataNode to be removed
-    if 
(configManager.getNodeManager().getNodeStatusByNodeId(dataNodeLocation.getDataNodeId())
-        == NodeStatus.Unknown) {
+    if (getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()) == 
NodeStatus.Unknown) {
       SyncDataNodeClientPool.getInstance()
           .sendSyncRequestToDataNodeWithGivenRetry(
               dataNodeLocation.getInternalEndPoint(),
@@ -388,10 +383,11 @@ public class ConfigNodeProcedureEnv {
     }
 
     // Force updating NodeStatus to Removing
-    getNodeManager()
-        .getNodeCacheMap()
-        .get(dataNodeLocation.getDataNodeId())
-        
.forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+    getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
   }
 
   /**
@@ -539,10 +535,7 @@ public class ConfigNodeProcedureEnv {
         (dataNodeId, regionStatus) ->
             heartbeatSampleMap.put(
                 dataNodeId, new RegionHeartbeatSample(currentTime, 
currentTime, regionStatus)));
-    getPartitionManager()
-        .getRegionGroupCacheMap()
-        .computeIfAbsent(regionGroupId, empty -> new 
RegionGroupCache(regionGroupId))
-        .forceUpdate(heartbeatSampleMap);
+    getLoadManager().forceUpdateRegionGroupCache(regionGroupId, 
heartbeatSampleMap);
 
     // Select leader greedily for iot consensus protocol
     if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index bf1ce7e940..c75fd419b8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -328,7 +327,7 @@ public class DataNodeRemoveHandler {
     TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
originalDataNode);
 
     status =
-        
configManager.getNodeManager().getNodeStatusByNodeId(originalDataNode.getDataNodeId())
+        
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
                 == NodeStatus.Unknown
             ? SyncDataNodeClientPool.getInstance()
                 .sendSyncRequestToDataNodeWithGivenRetry(
@@ -375,6 +374,9 @@ public class DataNodeRemoveHandler {
         getIdWithRpcEndpoint(originalDataNode),
         getIdWithRpcEndpoint(destDataNode));
 
+    // Remove the RegionGroupCache of the regionId
+    configManager.getLoadManager().removeRegionGroupCache(regionId);
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
@@ -427,7 +429,7 @@ public class DataNodeRemoveHandler {
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithGivenRetry(
                 dataNode.getInternalEndPoint(), dataNode, 
DataNodeRequestType.STOP_DATA_NODE, 2);
-    configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
+    configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
     LOGGER.info(
         "{}, Stop Data Node result: {}, stoppedDataNode: {}",
         REMOVE_DATANODE_PROCESS,
@@ -509,9 +511,8 @@ public class DataNodeRemoveHandler {
     if (CONF.getSchemaReplicationFactor() == 1 || 
CONF.getDataReplicationFactor() == 1) {
       for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
         // check whether removed data node is in running state
-        BaseNodeCache nodeCache =
-            
configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
-        if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) {
+        if (!NodeStatus.Running.equals(
+            
configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())))
 {
           removedDataNodes.remove(dataNodeLocation);
           LOGGER.error(
               "Failed to remove data node {} because it is not in running and 
the configuration of cluster is one replication",
@@ -530,7 +531,7 @@ public class DataNodeRemoveHandler {
             removeDataNodePlan.getDataNodeLocations().stream()
                 .filter(
                     x ->
-                        
configManager.getNodeManager().getNodeStatusByNodeId(x.getDataNodeId())
+                        
configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
                             != NodeStatus.Unknown)
                 .count();
     if (availableDatanodeSize - removedDataNodeSize < 
NodeInfo.getMinimumDataNode()) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index f1271c25b1..43c3404b58 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -120,7 +120,7 @@ public class DeleteDatabaseProcedure
               regionReplicaSet -> {
                 // Clear heartbeat cache along the way
                 env.getConfigManager()
-                    .getPartitionManager()
+                    .getLoadManager()
                     .removeRegionGroupCache(regionReplicaSet.getRegionId());
                 env.getConfigManager()
                     .getLoadManager()
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 46b58399ca..e1f02cffb2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -47,6 +47,9 @@ import static 
org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 /** Region migrate procedure */
 public class RegionMigrateProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, 
RegionTransitionState> {
+
+  // TODO: Reach an agreement on RegionMigrateProcedure
+
   private static final Logger LOG = 
LoggerFactory.getLogger(RegionMigrateProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index 6da0304f44..372832d106 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -64,7 +64,7 @@ public class GreedyPriorityTest {
           NodeStatus.Running, NodeStatus.Unknown, NodeStatus.Running, 
NodeStatus.ReadOnly
         };
     for (int i = 0; i < 4; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       nodeCacheMap
           .get(i)
           .cacheHeartbeatSample(
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index f1bf444427..b6248d7872 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -61,7 +61,7 @@ public class LeaderPriorityBalancerTest {
     long currentTimeMillis = System.currentTimeMillis();
     Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
     for (int i = 0; i < 6; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       if (i != 2 && i != 5) {
         nodeCacheMap
             .get(i)
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index b51cf10bcc..56f3e64d26 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -30,7 +30,7 @@ public class NodeCacheTest {
 
   @Test
   public void forceUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new 
DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new 
DataNodeHeartbeatCache(1);
 
     // Test default
     Assert.assertEquals(NodeStatus.Unknown, 
dataNodeHeartbeatCache.getNodeStatus());
@@ -55,7 +55,7 @@ public class NodeCacheTest {
 
   @Test
   public void periodicUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new 
DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new 
DataNodeHeartbeatCache(1);
     long currentTime = System.currentTimeMillis();
     dataNodeHeartbeatCache.cacheHeartbeatSample(
         new NodeHeartbeatSample(

Reply via email to