This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch leader-balance-constraint
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3db04ac8f33d05c5cbeb7d3777e91c4317eb26a8
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Apr 18 22:13:46 2024 +0800

    Finish
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   4 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   6 +-
 .../confignode/conf/ConfigNodeStartupCheck.java    |   6 +-
 .../manager/load/balancer/RouteBalancer.java       |  17 +--
 .../router/leader/AbstractLeaderBalancer.java      | 108 ++++++++++++++++++
 .../router/leader/GreedyLeaderBalancer.java        |  57 ++++------
 .../balancer/router/leader/ILeaderBalancer.java    |  49 --------
 .../router/leader/MinCostFlowLeaderBalancer.java   | 123 ++++++++-------------
 .../confignode/manager/load/cache/LoadCache.java   |  59 +++++++++-
 .../manager/load/cache/node/NodeStatistics.java    |   9 ++
 .../load/cache/region/RegionStatistics.java        |   7 ++
 .../router/leader/CFDLeaderBalancerTest.java       | 117 ++++++++++++++++++--
 .../router/leader/GreedyLeaderBalancerTest.java    | 118 +++++++++++++-------
 .../leader/LeaderBalancerComparisonTest.java       |  75 +++++++++++--
 14 files changed, 509 insertions(+), 246 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index b20f1e87d2f..3875bf09080 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import 
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -187,7 +187,7 @@ public class ConfigNodeConfig {
   private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
 
   /** The policy of cluster RegionGroups' leader distribution. */
-  private String leaderDistributionPolicy = ILeaderBalancer.CFD_POLICY;
+  private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
 
   /** Whether to enable auto leader balance for Ratis consensus protocol. */
   private boolean enableAutoLeaderBalanceForRatisConsensus = true;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 1bad7b48c5c..33b8ec69f8e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
-import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import 
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -354,8 +354,8 @@ public class ConfigNodeDescriptor {
         properties
             .getProperty("leader_distribution_policy", 
conf.getLeaderDistributionPolicy())
             .trim();
-    if (ILeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
-        || ILeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) {
+    if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
+        || AbstractLeaderBalancer.CFD_POLICY.equals(leaderDistributionPolicy)) 
{
       conf.setLeaderDistributionPolicy(leaderDistributionPolicy);
     } else {
       throw new IOException(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 1d8bf58259e..3e82e0bb4a6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.StartupChecks;
-import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
+import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
 
@@ -144,8 +144,8 @@ public class ConfigNodeStartupCheck extends StartupChecks {
     }
 
     // The leader distribution policy is limited
-    if 
(!ILeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
-        && 
!ILeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
+    if 
(!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
+        && 
!AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
       throw new ConfigurationException(
           "leader_distribution_policy",
           CONF.getRoutePriorityPolicy(),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 7269defdbe6..4896885e956 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
-import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
@@ -97,7 +97,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
 
   private final IManager configManager;
   // For generating optimal Region leader distribution
-  private final ILeaderBalancer leaderBalancer;
+  private final AbstractLeaderBalancer leaderBalancer;
   // For generating optimal cluster Region routing priority
   private final IPriorityBalancer priorityRouter;
 
@@ -118,10 +118,10 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     this.lastFailedTimeForLeaderBalance = new TreeMap<>();
 
     switch (CONF.getLeaderDistributionPolicy()) {
-      case ILeaderBalancer.GREEDY_POLICY:
+      case AbstractLeaderBalancer.GREEDY_POLICY:
         this.leaderBalancer = new GreedyLeaderBalancer();
         break;
-      case ILeaderBalancer.CFD_POLICY:
+      case AbstractLeaderBalancer.CFD_POLICY:
       default:
         this.leaderBalancer = new MinCostFlowLeaderBalancer();
         break;
@@ -157,13 +157,8 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
             getPartitionManager().getAllRegionGroupIdMap(regionGroupType),
             getPartitionManager().getAllReplicaSetsMap(regionGroupType),
             currentLeaderMap,
-            getNodeManager()
-                .filterDataNodeThroughStatus(
-                    NodeStatus.Unknown, NodeStatus.ReadOnly, 
NodeStatus.Removing)
-                .stream()
-                .map(TDataNodeConfiguration::getLocation)
-                .map(TDataNodeLocation::getDataNodeId)
-                .collect(Collectors.toSet()));
+            getLoadManager().getLoadCache().getCurrentDataNodeStatisticsMap(),
+            
getLoadManager().getLoadCache().getCurrentRegionStatisticsMap(regionGroupType));
 
     // Transfer leader to the optimal distribution
     long currentTime = System.nanoTime();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
new file mode 100644
index 00000000000..b82b3a0ca48
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public abstract class AbstractLeaderBalancer {
+
+  public static final String GREEDY_POLICY = "GREEDY";
+  public static final String CFD_POLICY = "CFD";
+
+  // Map<Database, List<RegionGroup>>
+  protected final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
+  // Map<RegionGroupId, RegionGroup>
+  protected final Map<TConsensusGroupId, TRegionReplicaSet> 
regionReplicaSetMap;
+  // Map<RegionGroupId, leaderId>
+  protected final Map<TConsensusGroupId, Integer> regionLeaderMap;
+  // Map<DataNodeId, NodeStatistics>
+  protected final Map<Integer, NodeStatistics> dataNodeStatisticsMap;
+  // Map<RegionGroupId, Map<DataNodeId, RegionStatistics>>
+  protected final Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap;
+
+  protected AbstractLeaderBalancer() {
+    this.databaseRegionGroupMap = new TreeMap<>();
+    this.regionReplicaSetMap = new TreeMap<>();
+    this.regionLeaderMap = new TreeMap<>();
+    this.dataNodeStatisticsMap = new TreeMap<>();
+    this.regionStatisticsMap = new TreeMap<>();
+  }
+
+  protected void initialize(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap) {
+    this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
+    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
+    this.regionLeaderMap.putAll(regionLeaderMap);
+    this.dataNodeStatisticsMap.putAll(dataNodeStatisticsMap);
+    this.regionStatisticsMap.putAll(regionStatisticsMap);
+  }
+
+  protected boolean isDataNodeAvailable(int dataNodeId) {
+    // RegionGroup-leader can only be placed on Running DataNode
+    return dataNodeStatisticsMap.containsKey(dataNodeId)
+        && 
NodeStatus.Running.equals(dataNodeStatisticsMap.get(dataNodeId).getStatus());
+  }
+
+  protected boolean isRegionAvailable(TConsensusGroupId regionGroupId, int 
dataNodeId) {
+    // Only Running Region can be selected as RegionGroup-leader
+    return regionStatisticsMap.containsKey(regionGroupId)
+        && regionStatisticsMap.get(regionGroupId).containsKey(dataNodeId)
+        && RegionStatus.Running.equals(
+            
regionStatisticsMap.get(regionGroupId).get(dataNodeId).getRegionStatus());
+  }
+
+  protected void clear() {
+    this.databaseRegionGroupMap.clear();
+    this.regionReplicaSetMap.clear();
+    this.regionLeaderMap.clear();
+    this.dataNodeStatisticsMap.clear();
+    this.regionStatisticsMap.clear();
+  }
+
+  /**
+   * Generate an optimal leader distribution.
+   *
+   * @param databaseRegionGroupMap RegionGroup held by each Database
+   * @param regionReplicaSetMap All RegionGroups the cluster currently have
+   * @param regionLeaderMap The current leader distribution of each RegionGroup
+   * @param dataNodeStatisticsMap The current statistics of each DataNode
+   * @param regionStatisticsMap The current statistics of each Region
+   * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
+   */
+  public abstract Map<TConsensusGroupId, Integer> 
generateOptimalLeaderDistribution(
+      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
+      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
+      Map<TConsensusGroupId, Integer> regionLeaderMap,
+      Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap);
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 4a28fb6ca31..5a6098b60f9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -22,26 +22,19 @@ package 
org.apache.iotdb.confignode.manager.load.balancer.router.leader;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** Leader distribution balancer that uses greedy algorithm */
-public class GreedyLeaderBalancer implements ILeaderBalancer {
-
-  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
-  private final Map<TConsensusGroupId, Integer> regionLeaderMap;
-  private final Set<Integer> disabledDataNodeSet;
+public class GreedyLeaderBalancer extends AbstractLeaderBalancer {
 
   public GreedyLeaderBalancer() {
-    this.regionReplicaSetMap = new HashMap<>();
-    this.regionLeaderMap = new ConcurrentHashMap<>();
-    this.disabledDataNodeSet = new HashSet<>();
+    super();
   }
 
   @Override
@@ -49,30 +42,19 @@ public class GreedyLeaderBalancer implements 
ILeaderBalancer {
       Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet) {
-    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
-
+      Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap) {
+    initialize(
+        databaseRegionGroupMap,
+        regionReplicaSetMap,
+        regionLeaderMap,
+        dataNodeStatisticsMap,
+        regionStatisticsMap);
     Map<TConsensusGroupId, Integer> result = constructGreedyDistribution();
-
     clear();
     return result;
   }
 
-  private void initialize(
-      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
-      Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet) {
-    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
-    this.regionLeaderMap.putAll(regionLeaderMap);
-    this.disabledDataNodeSet.addAll(disabledDataNodeSet);
-  }
-
-  private void clear() {
-    this.regionReplicaSetMap.clear();
-    this.regionLeaderMap.clear();
-    this.disabledDataNodeSet.clear();
-  }
-
   private Map<TConsensusGroupId, Integer> constructGreedyDistribution() {
     Map<Integer, Integer> leaderCounter = new TreeMap<>();
     regionReplicaSetMap.forEach(
@@ -81,14 +63,13 @@ public class GreedyLeaderBalancer implements 
ILeaderBalancer {
               leaderId = regionLeaderMap.getOrDefault(regionGroupId, -1);
           for (TDataNodeLocation dataNodeLocation : 
regionGroup.getDataNodeLocations()) {
             int dataNodeId = dataNodeLocation.getDataNodeId();
-            if (disabledDataNodeSet.contains(dataNodeId)) {
-              continue;
-            }
-            // Select the DataNode with the minimal leader count as the new 
leader
-            int count = leaderCounter.getOrDefault(dataNodeId, 0);
-            if (count < minCount) {
-              minCount = count;
-              leaderId = dataNodeId;
+            if (isDataNodeAvailable(dataNodeId) && 
isRegionAvailable(regionGroupId, dataNodeId)) {
+              // Select the DataNode with the minimal leader count as the new 
leader
+              int count = leaderCounter.getOrDefault(dataNodeId, 0);
+              if (count < minCount) {
+                minCount = count;
+                leaderId = dataNodeId;
+              }
             }
           }
           regionLeaderMap.put(regionGroupId, leaderId);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
deleted file mode 100644
index 65c45aaebab..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/ILeaderBalancer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public interface ILeaderBalancer {
-
-  String GREEDY_POLICY = "GREEDY";
-  String CFD_POLICY = "CFD";
-
-  /**
-   * Generate an optimal leader distribution.
-   *
-   * @param databaseRegionGroupMap RegionGroup held by each Database
-   * @param regionReplicaSetMap All RegionGroups the cluster currently have
-   * @param regionLeaderMap The current leader of each RegionGroup
-   * @param disabledDataNodeSet The DataNodes that currently unable to 
work(can't place
-   *     RegionGroup-leader)
-   * @return Map<TConsensusGroupId, Integer>, The optimal leader distribution
-   */
-  Map<TConsensusGroupId, Integer> generateOptimalLeaderDistribution(
-      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
-      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
-      Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet);
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 48aa8cc5547..daf1d15f27c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,23 +32,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** Leader distribution balancer that uses minimum cost flow algorithm */
-public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
+public class MinCostFlowLeaderBalancer extends AbstractLeaderBalancer {
 
   private static final int INFINITY = Integer.MAX_VALUE;
 
-  /** Input parameters */
-  private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
-
-  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
-  private final Map<TConsensusGroupId, Integer> regionLeaderMap;
-  private final Set<Integer> disabledDataNodeSet;
-
   /** Graph nodes */
   // Super source node
   private static final int S_NODE = 0;
@@ -78,10 +71,7 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private int minimumCost = 0;
 
   public MinCostFlowLeaderBalancer() {
-    this.databaseRegionGroupMap = new TreeMap<>();
-    this.regionReplicaSetMap = new TreeMap<>();
-    this.regionLeaderMap = new TreeMap<>();
-    this.disabledDataNodeSet = new TreeSet<>();
+    super();
     this.rNodeMap = new TreeMap<>();
     this.sDNodeMap = new TreeMap<>();
     this.sDNodeReflect = new TreeMap<>();
@@ -94,47 +84,34 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
       Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet) {
-
-    initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
-
+      Map<Integer, NodeStatistics> dataNodeStatisticsMap,
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap) {
+    initialize(
+        databaseRegionGroupMap,
+        regionReplicaSetMap,
+        regionLeaderMap,
+        dataNodeStatisticsMap,
+        regionStatisticsMap);
     Map<TConsensusGroupId, Integer> result;
     constructMCFGraph();
     dinicAlgorithm();
     result = collectLeaderDistribution();
-
     clear();
     return result;
   }
 
-  private void initialize(
-      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
-      Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
-      Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet) {
-    this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
-    this.regionReplicaSetMap.putAll(regionReplicaSetMap);
-    this.regionLeaderMap.putAll(regionLeaderMap);
-    this.disabledDataNodeSet.addAll(disabledDataNodeSet);
-  }
-
-  private void clear() {
-    this.databaseRegionGroupMap.clear();
-    this.regionReplicaSetMap.clear();
-    this.regionLeaderMap.clear();
-    this.disabledDataNodeSet.clear();
-
+  @Override
+  protected void clear() {
+    super.clear();
     this.rNodeMap.clear();
     this.sDNodeMap.clear();
     this.sDNodeReflect.clear();
     this.tDNodeMap.clear();
     this.minCostFlowEdges.clear();
-
     this.nodeHeadEdge = null;
     this.nodeCurrentEdge = null;
     this.isNodeVisited = null;
     this.nodeMinimumCost = null;
-
     this.maxNode = T_NODE + 1;
     this.maxEdge = 0;
   }
@@ -155,18 +132,16 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
         for (TDataNodeLocation dataNodeLocation :
             regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
           int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
-          }
-          if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
-            sDNodeMap.get(database).put(dataNodeId, maxNode);
-            sDNodeReflect.get(database).put(maxNode, dataNodeId);
-            maxNode += 1;
-          }
-          if (!tDNodeMap.containsKey(dataNodeId)) {
-            tDNodeMap.put(dataNodeId, maxNode);
-            maxNode += 1;
+          if (isDataNodeAvailable(dataNodeId)) {
+            if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
+              sDNodeMap.get(database).put(dataNodeId, maxNode);
+              sDNodeReflect.get(database).put(maxNode, dataNodeId);
+              maxNode += 1;
+            }
+            if (!tDNodeMap.containsKey(dataNodeId)) {
+              tDNodeMap.put(dataNodeId, maxNode);
+              maxNode += 1;
+            }
           }
         }
       }
@@ -194,15 +169,13 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
         for (TDataNodeLocation dataNodeLocation :
             regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
           int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
+          if (isDataNodeAvailable(dataNodeId) && 
isRegionAvailable(regionGroupId, dataNodeId)) {
+            int sDNode = sDNodeMap.get(database).get(dataNodeId);
+            // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
+            // Therefore, the RegionGroup will keep the leader as constant as 
possible.
+            int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
+            addAdjacentEdges(rNode, sDNode, 1, cost);
           }
-          int sDNode = sDNodeMap.get(database).get(dataNodeId);
-          // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
-          // Therefore, the RegionGroup will keep the leader as constant as 
possible.
-          int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
-          addAdjacentEdges(rNode, sDNode, 1, cost);
         }
       }
     }
@@ -217,17 +190,15 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
         for (TDataNodeLocation dataNodeLocation :
             regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
           int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
+          if (isDataNodeAvailable(dataNodeId)) {
+            int sDNode = sDNodeMap.get(database).get(dataNodeId);
+            int tDNode = tDNodeMap.get(dataNodeId);
+            int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+            // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+            // Thus, the leader distribution will be as balance as possible 
within each Database
+            // based on the Jensen's-Inequality.
+            addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
           }
-          int sDNode = sDNodeMap.get(database).get(dataNodeId);
-          int tDNode = tDNodeMap.get(dataNodeId);
-          int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
-          // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
-          // Thus, the leader distribution will be as balance as possible 
within each Database
-          // based on the Jensen's-Inequality.
-          addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
         }
       }
     }
@@ -239,16 +210,14 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
     for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
       for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
         int dataNodeId = dataNodeLocation.getDataNodeId();
-        if (disabledDataNodeSet.contains(dataNodeId)) {
-          // Skip disabled DataNode
-          continue;
+        if (isDataNodeAvailable(dataNodeId)) {
+          int tDNode = tDNodeMap.get(dataNodeId);
+          int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, 
Integer::sum);
+          // Cost: x^2 for the x-th edge at the current dNode.
+          // Thus, the leader distribution will be as balance as possible 
within the cluster
+          // Based on the Jensen's-Inequality.
+          addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
         }
-        int tDNode = tDNodeMap.get(dataNodeId);
-        int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
-        // Cost: x^2 for the x-th edge at the current dNode.
-        // Thus, the leader distribution will be as balance as possible within 
the cluster
-        // Based on the Jensen's-Inequality.
-        addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
       }
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 6fae8010e22..be561937b9b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 
 import org.slf4j.Logger;
@@ -261,6 +262,22 @@ public class LoadCache {
     return nodeStatisticsMap;
   }
 
+  /**
+   * Get the NodeStatistics of all DataNodes.
+   *
+   * @return a map of all DataNodes' NodeStatistics
+   */
+  public Map<Integer, NodeStatistics> getCurrentDataNodeStatisticsMap() {
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          if (nodeCache instanceof DataNodeHeartbeatCache) {
+            dataNodeStatisticsMap.put(nodeId, (NodeStatistics) 
nodeCache.getCurrentStatistics());
+          }
+        });
+    return dataNodeStatisticsMap;
+  }
+
   /**
    * Get the RegionGroupStatistics of all RegionGroups.
    *
@@ -274,6 +291,30 @@ public class LoadCache {
     return regionGroupStatisticsMap;
   }
 
+  /**
+   * Get the RegionStatistics of all Regions.
+   *
+   * @param type DataRegion or SchemaRegion
+   * @return a map of RegionStatistics
+   */
+  public Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
getCurrentRegionStatisticsMap(
+      TConsensusGroupType type) {
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (type.equals(regionGroupId.getType())) {
+            regionStatisticsMap.put(
+                regionGroupId, 
regionGroupCache.getCurrentStatistics().getRegionStatisticsMap());
+          }
+        });
+    return regionStatisticsMap;
+  }
+
+  /**
+   * Get the ConsensusGroupStatistics of all RegionGroups.
+   *
+   * @return a map of ConsensusGroupStatistics
+   */
   public Map<TConsensusGroupId, ConsensusGroupStatistics> 
getCurrentConsensusGroupStatisticsMap() {
     Map<TConsensusGroupId, ConsensusGroupStatistics> 
consensusGroupStatisticsMap = new TreeMap<>();
     consensusGroupCacheMap.forEach(
@@ -294,6 +335,22 @@ public class LoadCache {
     return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
   }
 
+  /**
+   * Get all DataNodes' NodeStatus
+   *
+   * @return Map<DataNodeId, NodeStatus>
+   */
+  public Map<Integer, NodeStatus> getDataNodeStatus() {
+    Map<Integer, NodeStatus> nodeStatusMap = new TreeMap<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          if (nodeCache instanceof DataNodeHeartbeatCache) {
+            nodeStatusMap.put(nodeId, nodeCache.getNodeStatus());
+          }
+        });
+    return nodeStatusMap;
+  }
+
   /**
    * Safely get the specified Node's current status with reason.
    *
@@ -448,7 +505,7 @@ public class LoadCache {
    */
   public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
       List<TConsensusGroupId> consensusGroupIds) {
-    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new 
ConcurrentHashMap<>();
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new 
TreeMap<>();
     for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
       regionGroupStatusMap.put(consensusGroupId, 
getRegionGroupStatus(consensusGroupId));
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index e3c8bd25dbb..24bee71ad0e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
 
 import java.util.Objects;
@@ -42,6 +43,14 @@ public class NodeStatistics extends AbstractStatistics {
     this.loadScore = loadScore;
   }
 
+  @TestOnly
+  public NodeStatistics(NodeStatus status) {
+    super(System.nanoTime());
+    this.status = status;
+    this.statusReason = null;
+    this.loadScore = Long.MAX_VALUE;
+  }
+
   public static NodeStatistics generateDefaultNodeStatistics() {
     return new NodeStatistics(Long.MIN_VALUE, NodeStatus.Unknown, null, 
Long.MAX_VALUE);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
index 1246f0ed435..84f22ef4ac5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractStatistics;
 
 import java.util.Objects;
@@ -34,6 +35,12 @@ public class RegionStatistics extends AbstractStatistics {
     this.regionStatus = regionStatus;
   }
 
+  @TestOnly
+  public RegionStatistics(RegionStatus regionStatus) {
+    super(System.nanoTime());
+    this.regionStatus = regionStatus;
+  }
+
   public static RegionStatistics generateDefaultRegionStatistics() {
     return new RegionStatistics(0, RegionStatus.Unknown);
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
index 716fa55a80f..7cc1f50c28c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/CFDLeaderBalancerTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,7 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,21 +66,37 @@ public class CFDLeaderBalancerTest {
     // The result will be unbalanced if select DataNode-2 as leader for 
RegionGroup-0
     // and select DataNode-3 as leader for RegionGroup-1
     List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
     regionReplicaSets.add(
         new TRegionReplicaSet(
             regionGroupIds.get(0),
             Arrays.asList(
                 dataNodeLocations.get(0), dataNodeLocations.get(1), 
dataNodeLocations.get(2))));
+    Map<Integer, RegionStatistics> region0 = new TreeMap<>();
+    region0.put(0, new RegionStatistics(RegionStatus.Unknown));
+    region0.put(1, new RegionStatistics(RegionStatus.Running));
+    region0.put(2, new RegionStatistics(RegionStatus.Running));
+    regionStatisticsMap.put(regionGroupIds.get(0), region0);
     regionReplicaSets.add(
         new TRegionReplicaSet(
             regionGroupIds.get(1),
             Arrays.asList(
                 dataNodeLocations.get(0), dataNodeLocations.get(1), 
dataNodeLocations.get(3))));
+    Map<Integer, RegionStatistics> region1 = new TreeMap<>();
+    region1.put(0, new RegionStatistics(RegionStatus.Unknown));
+    region1.put(1, new RegionStatistics(RegionStatus.Running));
+    region1.put(3, new RegionStatistics(RegionStatus.Running));
+    regionStatisticsMap.put(regionGroupIds.get(1), region1);
     regionReplicaSets.add(
         new TRegionReplicaSet(
             regionGroupIds.get(2),
             Arrays.asList(
                 dataNodeLocations.get(0), dataNodeLocations.get(2), 
dataNodeLocations.get(3))));
+    Map<Integer, RegionStatistics> region2 = new TreeMap<>();
+    region2.put(0, new RegionStatistics(RegionStatus.Unknown));
+    region2.put(2, new RegionStatistics(RegionStatus.Running));
+    region2.put(3, new RegionStatistics(RegionStatus.Running));
+    regionStatisticsMap.put(regionGroupIds.get(2), region2);
 
     // Prepare input parameters
     Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new 
TreeMap<>();
@@ -89,13 +108,20 @@ public class CFDLeaderBalancerTest {
     Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
     regionReplicaSets.forEach(
         regionReplicaSet -> 
regionLeaderMap.put(regionReplicaSet.getRegionId(), 0));
-    Set<Integer> disabledDataNodeSet = new HashSet<>();
-    disabledDataNodeSet.add(0);
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown));
+    dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
 
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
         BALANCER.generateOptimalLeaderDistribution(
-            databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
+            databaseRegionGroupMap,
+            regionReplicaSetMap,
+            regionLeaderMap,
+            dataNodeStatisticsMap,
+            regionStatisticsMap);
     // All RegionGroup got a leader
     Assert.assertEquals(3, leaderDistribution.size());
     // Each DataNode has exactly one leader
@@ -125,15 +151,25 @@ public class CFDLeaderBalancerTest {
     regionReplicaSetMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
     Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
     regionLeaderMap.put(regionReplicaSet.getRegionId(), 1);
-    Set<Integer> disabledDataNodeSet = new HashSet<>();
-    disabledDataNodeSet.add(0);
-    disabledDataNodeSet.add(1);
-    disabledDataNodeSet.add(2);
+    Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
+    nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Unknown));
+    nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.ReadOnly));
+    nodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Removing));
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
+    Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+    regionStatistics.put(0, new RegionStatistics(RegionStatus.Running));
+    regionStatistics.put(1, new RegionStatistics(RegionStatus.Running));
+    regionStatistics.put(2, new RegionStatistics(RegionStatus.Running));
+    regionStatisticsMap.put(regionReplicaSet.getRegionId(), regionStatistics);
 
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
         BALANCER.generateOptimalLeaderDistribution(
-            databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
+            databaseRegionGroupMap,
+            regionReplicaSetMap,
+            regionLeaderMap,
+            nodeStatisticsMap,
+            regionStatisticsMap);
     Assert.assertEquals(1, leaderDistribution.size());
     Assert.assertEquals(1, new HashSet<>(leaderDistribution.values()).size());
     // Leader remains the same
@@ -146,6 +182,55 @@ public class CFDLeaderBalancerTest {
     Assert.assertEquals(0, BALANCER.getMinimumCost());
   }
 
+  @Test
+  public void migrateTest() {
+    // All DataNodes are in Running status
+    Map<Integer, NodeStatistics> nodeStatisticsMap = new TreeMap<>();
+    nodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running));
+    nodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+    // Prepare RegionGroups
+    Map<String, List<TConsensusGroupId>> databaseRegionGroupMap = new 
TreeMap<>();
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
+    for (int i = 0; i < 5; i++) {
+      TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
+      databaseRegionGroupMap
+          .computeIfAbsent(DATABASE, empty -> new ArrayList<>())
+          .add(regionGroupId);
+      TRegionReplicaSet regionReplicaSet =
+          new TRegionReplicaSet(
+              regionGroupId,
+              Arrays.asList(
+                  new TDataNodeLocation().setDataNodeId(0),
+                  new TDataNodeLocation().setDataNodeId(1)));
+      regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
+      regionLeaderMap.put(regionGroupId, 0);
+      // Assuming all Regions are migrating from DataNode-1 to DataNode-2
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+      regionStatistics.put(0, new RegionStatistics(RegionStatus.Removing));
+      regionStatistics.put(1, new RegionStatistics(RegionStatus.Running));
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
+    }
+
+    // Do balancing
+    Map<TConsensusGroupId, Integer> leaderDistribution =
+        BALANCER.generateOptimalLeaderDistribution(
+            databaseRegionGroupMap,
+            regionReplicaSetMap,
+            regionLeaderMap,
+            nodeStatisticsMap,
+            regionStatisticsMap);
+    for (int i = 0; i < 5; i++) {
+      // All RegionGroups' leader should be DataNode-1
+      Assert.assertEquals(
+          1,
+          leaderDistribution
+              .get(new TConsensusGroupId(TConsensusGroupType.DataRegion, i))
+              .intValue());
+    }
+  }
+
   /**
    * In this case shows the balance ability for big cluster.
    *
@@ -156,6 +241,10 @@ public class CFDLeaderBalancerTest {
     final int regionGroupNum = 1500;
     final int dataNodeNum = 300;
     final int replicationFactor = 3;
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    for (int i = 0; i < dataNodeNum; i++) {
+      dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running));
+    }
 
     // The loadCost for each DataNode are the same
     int x = regionGroupNum / dataNodeNum;
@@ -168,16 +257,20 @@ public class CFDLeaderBalancerTest {
     databaseRegionGroupMap.put(DATABASE, new ArrayList<>());
     Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
     Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
     for (int i = 0; i < regionGroupNum; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
       int leaderId = (dataNodeId + random.nextInt(replicationFactor)) % 
dataNodeNum;
 
       TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
       regionReplicaSet.setRegionId(regionGroupId);
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
       for (int j = 0; j < 3; j++) {
         regionReplicaSet.addToDataNodeLocations(new 
TDataNodeLocation().setDataNodeId(dataNodeId));
+        regionStatistics.put(dataNodeId, new 
RegionStatistics(RegionStatus.Running));
         dataNodeId = (dataNodeId + 1) % dataNodeNum;
       }
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
 
       databaseRegionGroupMap.get(DATABASE).add(regionGroupId);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
@@ -187,7 +280,11 @@ public class CFDLeaderBalancerTest {
     // Do balancing
     Map<TConsensusGroupId, Integer> leaderDistribution =
         BALANCER.generateOptimalLeaderDistribution(
-            databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, new 
HashSet<>());
+            databaseRegionGroupMap,
+            regionReplicaSetMap,
+            regionLeaderMap,
+            dataNodeStatisticsMap,
+            regionStatisticsMap);
     // All RegionGroup got a leader
     Assert.assertEquals(regionGroupNum, leaderDistribution.size());
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
index a443fe74c02..6b97e1b385b 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancerTest.java
@@ -23,16 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -43,42 +45,56 @@ public class GreedyLeaderBalancerTest {
 
   @Test
   public void optimalLeaderDistributionTest() {
-    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
HashMap<>();
-    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
-    Set<Integer> disabledDataNodeSet = new HashSet<>();
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
     Random random = new Random();
 
+    // Assuming all DataNodes are in Running status
+    for (int i = 0; i < 6; i++) {
+      dataNodeStatisticsMap.put(i, new NodeStatistics(NodeStatus.Running));
+    }
+
     // Build 9 RegionGroups in DataNodes 0~2
     for (int i = 0; i < 9; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
-      TRegionReplicaSet regionReplicaSet =
-          new TRegionReplicaSet(
-              regionGroupId,
-              Arrays.asList(
-                  new TDataNodeLocation().setDataNodeId(0),
-                  new TDataNodeLocation().setDataNodeId(1),
-                  new TDataNodeLocation().setDataNodeId(2)));
+      List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+      for (int j = 0; j < 3; j++) {
+        dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+        // Assuming all Regions are in Running status
+        regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
+      }
+      TRegionReplicaSet regionReplicaSet = new 
TRegionReplicaSet(regionGroupId, dataNodeLocations);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
       regionLeaderMap.put(regionGroupId, random.nextInt(3));
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
     }
 
     // Build 9 RegionGroups in DataNodes 3~5
     for (int i = 9; i < 18; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
-      TRegionReplicaSet regionReplicaSet =
-          new TRegionReplicaSet(
-              regionGroupId,
-              Arrays.asList(
-                  new TDataNodeLocation().setDataNodeId(3),
-                  new TDataNodeLocation().setDataNodeId(4),
-                  new TDataNodeLocation().setDataNodeId(5)));
+      List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+      for (int j = 3; j < 6; j++) {
+        dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+        // Assuming all Regions are in Running status
+        regionStatistics.put(j, new RegionStatistics(RegionStatus.Running));
+      }
+      TRegionReplicaSet regionReplicaSet = new 
TRegionReplicaSet(regionGroupId, dataNodeLocations);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
       regionLeaderMap.put(regionGroupId, 3 + random.nextInt(3));
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
     }
 
     Map<TConsensusGroupId, Integer> leaderDistribution =
         BALANCER.generateOptimalLeaderDistribution(
-            new TreeMap<>(), regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
+            new TreeMap<>(),
+            regionReplicaSetMap,
+            regionLeaderMap,
+            dataNodeStatisticsMap,
+            regionStatisticsMap);
     Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
     leaderDistribution.forEach(
         (regionGroupId, leaderId) ->
@@ -94,44 +110,60 @@ public class GreedyLeaderBalancerTest {
 
   @Test
   public void disableTest() {
-    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
HashMap<>();
-    Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
-    Set<Integer> disabledDataNodeSet = new HashSet<>();
-
-    disabledDataNodeSet.add(1);
-    disabledDataNodeSet.add(4);
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new 
TreeMap<>();
+    Map<TConsensusGroupId, Integer> regionLeaderMap = new TreeMap<>();
+    Map<Integer, NodeStatistics> dataNodeStatisticsMap = new TreeMap<>();
+    Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap 
= new TreeMap<>();
+
+    // Assuming DataNode 1 and 4 are disabled
+    dataNodeStatisticsMap.put(0, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Unknown));
+    dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
+    dataNodeStatisticsMap.put(4, new NodeStatistics(NodeStatus.ReadOnly));
+    dataNodeStatisticsMap.put(5, new NodeStatistics(NodeStatus.Running));
 
     // Build 10 RegionGroup whose leaders are all 1(Disabled)
     for (int i = 0; i < 10; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
-      TRegionReplicaSet regionReplicaSet =
-          new TRegionReplicaSet(
-              regionGroupId,
-              Arrays.asList(
-                  new TDataNodeLocation().setDataNodeId(0),
-                  new TDataNodeLocation().setDataNodeId(1),
-                  new TDataNodeLocation().setDataNodeId(2)));
+      List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+      for (int j = 0; j < 3; j++) {
+        dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+        // Assuming all Regions in DataNode 1 are Unknown
+        regionStatistics.put(
+            j, new RegionStatistics(j == 1 ? RegionStatus.Unknown : 
RegionStatus.Running));
+      }
+      TRegionReplicaSet regionReplicaSet = new 
TRegionReplicaSet(regionGroupId, dataNodeLocations);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
       regionLeaderMap.put(regionGroupId, 1);
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
     }
 
     // Build 10 RegionGroup whose leaders are all 4(Disabled)
     for (int i = 10; i < 20; i++) {
       TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i);
-      TRegionReplicaSet regionReplicaSet =
-          new TRegionReplicaSet(
-              regionGroupId,
-              Arrays.asList(
-                  new TDataNodeLocation().setDataNodeId(3),
-                  new TDataNodeLocation().setDataNodeId(4),
-                  new TDataNodeLocation().setDataNodeId(5)));
+      List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+      Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+      for (int j = 3; j < 6; j++) {
+        dataNodeLocations.add(new TDataNodeLocation().setDataNodeId(j));
+        // Assuming all Regions in DataNode 4 are ReadOnly
+        regionStatistics.put(
+            j, new RegionStatistics(j == 4 ? RegionStatus.ReadOnly : 
RegionStatus.Running));
+      }
+      TRegionReplicaSet regionReplicaSet = new 
TRegionReplicaSet(regionGroupId, dataNodeLocations);
       regionReplicaSetMap.put(regionGroupId, regionReplicaSet);
       regionLeaderMap.put(regionGroupId, 4);
+      regionStatisticsMap.put(regionGroupId, regionStatistics);
     }
 
     Map<TConsensusGroupId, Integer> leaderDistribution =
         BALANCER.generateOptimalLeaderDistribution(
-            new TreeMap<>(), regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
+            new TreeMap<>(),
+            regionReplicaSetMap,
+            regionLeaderMap,
+            dataNodeStatisticsMap,
+            regionStatisticsMap);
     Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
     leaderDistribution.forEach(
         (regionGroupId, leaderId) ->
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
index 2a1c7093906..182b5edbce9 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/LeaderBalancerComparisonTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -88,6 +92,24 @@ public class LeaderBalancerComparisonTest {
 
       // Basic test
       Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new 
ConcurrentHashMap<>();
+      Map<Integer, NodeStatistics> allRunningDataNodeStatistics = new 
TreeMap<>();
+      for (int i = 0; i < dataNodeNum; i++) {
+        allRunningDataNodeStatistics.put(i, new 
NodeStatistics(NodeStatus.Running));
+      }
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
allRunningRegionStatistics =
+          new TreeMap<>();
+      regionReplicaSetMap.forEach(
+          (regionGroupId, regionReplicaSet) -> {
+            Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation ->
+                        regionStatistics.put(
+                            dataNodeLocation.getDataNodeId(),
+                            new RegionStatistics(RegionStatus.Running)));
+            allRunningRegionStatistics.put(regionGroupId, regionStatistics);
+          });
       Statistics greedyStatistics =
           doBalancing(
               dataNodeNum,
@@ -95,7 +117,8 @@ public class LeaderBalancerComparisonTest {
               GREEDY_LEADER_BALANCER,
               regionReplicaSetMap,
               regionLeaderMap,
-              new HashSet<>(),
+              allRunningDataNodeStatistics,
+              allRunningRegionStatistics,
               greedyLeaderDistribution);
       Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new 
ConcurrentHashMap<>();
       Statistics mcfStatistics =
@@ -105,7 +128,8 @@ public class LeaderBalancerComparisonTest {
               MIN_COST_FLOW_LEADER_BALANCER,
               regionReplicaSetMap,
               regionLeaderMap,
-              new HashSet<>(),
+              allRunningDataNodeStatistics,
+              allRunningRegionStatistics,
               mcfLeaderDistribution);
       if (isCommandLineMode) {
         LOGGER.info("[Basic test]");
@@ -126,6 +150,30 @@ public class LeaderBalancerComparisonTest {
         }
         disabledDataNodeSet.add(dataNodeId);
       }
+      Map<Integer, NodeStatistics> disabledDataNodeStatistics = new 
TreeMap<>();
+      for (int i = 0; i < dataNodeNum; i++) {
+        disabledDataNodeStatistics.put(
+            i,
+            disabledDataNodeSet.contains(i)
+                ? new NodeStatistics(NodeStatus.Unknown)
+                : new NodeStatistics(NodeStatus.Running));
+      }
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
disabledRegionStatistics =
+          new TreeMap<>();
+      regionReplicaSetMap.forEach(
+          (regionGroupId, regionReplicaSet) -> {
+            Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>();
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation ->
+                        regionStatistics.put(
+                            dataNodeLocation.getDataNodeId(),
+                            
disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId())
+                                ? new RegionStatistics(RegionStatus.Unknown)
+                                : new RegionStatistics(RegionStatus.Running)));
+            disabledRegionStatistics.put(regionGroupId, regionStatistics);
+          });
       greedyStatistics =
           doBalancing(
               dataNodeNum,
@@ -133,7 +181,8 @@ public class LeaderBalancerComparisonTest {
               GREEDY_LEADER_BALANCER,
               regionReplicaSetMap,
               greedyLeaderDistribution,
-              disabledDataNodeSet,
+              disabledDataNodeStatistics,
+              disabledRegionStatistics,
               greedyLeaderDistribution);
       mcfStatistics =
           doBalancing(
@@ -142,7 +191,8 @@ public class LeaderBalancerComparisonTest {
               MIN_COST_FLOW_LEADER_BALANCER,
               regionReplicaSetMap,
               mcfLeaderDistribution,
-              disabledDataNodeSet,
+              disabledDataNodeStatistics,
+              disabledRegionStatistics,
               mcfLeaderDistribution);
       if (isCommandLineMode) {
         LOGGER.info("[Disaster test]");
@@ -161,7 +211,8 @@ public class LeaderBalancerComparisonTest {
               GREEDY_LEADER_BALANCER,
               regionReplicaSetMap,
               greedyLeaderDistribution,
-              new HashSet<>(),
+              allRunningDataNodeStatistics,
+              allRunningRegionStatistics,
               greedyLeaderDistribution);
       mcfStatistics =
           doBalancing(
@@ -170,7 +221,8 @@ public class LeaderBalancerComparisonTest {
               MIN_COST_FLOW_LEADER_BALANCER,
               regionReplicaSetMap,
               mcfLeaderDistribution,
-              new HashSet<>(),
+              allRunningDataNodeStatistics,
+              allRunningRegionStatistics,
               mcfLeaderDistribution);
       if (isCommandLineMode) {
         LOGGER.info("[Recovery test]");
@@ -254,10 +306,11 @@ public class LeaderBalancerComparisonTest {
   private Statistics doBalancing(
       int dataNodeNum,
       int regionGroupNum,
-      ILeaderBalancer leaderBalancer,
+      AbstractLeaderBalancer leaderBalancer,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
-      Set<Integer> disabledDataNodeSet,
+      Map<Integer, NodeStatistics> nodeStatisticsMap,
+      Map<TConsensusGroupId, Map<Integer, RegionStatistics>> 
regionStatisticsMap,
       Map<TConsensusGroupId, Integer> stableLeaderDistribution) {
 
     Statistics result = new Statistics();
@@ -266,7 +319,11 @@ public class LeaderBalancerComparisonTest {
     for (int rounds = 0; rounds < 1000; rounds++) {
       Map<TConsensusGroupId, Integer> currentDistribution =
           leaderBalancer.generateOptimalLeaderDistribution(
-              new TreeMap<>(), regionReplicaSetMap, lastDistribution, 
disabledDataNodeSet);
+              new TreeMap<>(),
+              regionReplicaSetMap,
+              lastDistribution,
+              nodeStatisticsMap,
+              regionStatisticsMap);
       if (currentDistribution.equals(lastDistribution)) {
         // The leader distribution is stable
         result.rounds = rounds;

Reply via email to