This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/region-multi-database by this 
push:
     new 47cdc2bf260 update algorithm
47cdc2bf260 is described below

commit 47cdc2bf260acb4d584611d7b3fc2fc5fe8f45d6
Author: YongzaoDan <[email protected]>
AuthorDate: Sun Mar 17 16:11:15 2024 +0800

    update algorithm
---
 .../confignode/conf/ConfigNodeDescriptor.java      |   3 +-
 .../region/GreedyCopySetRegionGroupAllocator.java  | 106 +++--------
 .../router/leader/GreedyLeaderBalancer.java        |   1 -
 .../router/leader/MinCostFlowLeaderBalancer.java   | 209 ++++++++-------------
 4 files changed, 108 insertions(+), 211 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index f138143ff9b..38a52154f8a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -359,8 +359,7 @@ public class ConfigNodeDescriptor {
     } else {
       throw new IOException(
           String.format(
-              "Unknown leader_distribution_policy: %s, "
-                  + "please set to \"GREEDY\" or \"CFD\"",
+              "Unknown leader_distribution_policy: %s, " + "please set to 
\"GREEDY\" or \"CFD\"",
               leaderDistributionPolicy));
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 4623c9469f3..22dfa1d339b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -40,47 +39,36 @@ import static java.util.Map.Entry.comparingByValue;
 public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator {
 
   private static final Random RANDOM = new Random();
-  private static final int GCR_MAX_OPTIMAL_PLAN_NUM =
-      ConfigNodeDescriptor.getInstance().getConf().getGcrMaxOptimalPlanNum();
 
   private int replicationFactor;
   // Sorted available DataNodeIds
   private int[] dataNodeIds;
   // The number of allocated Regions in each DataNode
   private int[] regionCounter;
-  // The number of allocated Regions in each DataNode within the same Database
-  private int[] databaseRegionCounter;
   // The number of 2-Region combinations in current cluster
   private int[][] combinationCounter;
-  private int maxDataNodeId;
 
-  // First Key: the sum of Regions at the DataNodes within the same Database 
in the allocation
-  // result is minimal
-  int optimalDatabaseRegionSum;
-  // Second Key: the sum of Regions at the DataNodes in the allocation result 
is minimal
+  // First Key: the sum of Regions at the DataNodes in the allocation result 
is minimal
   int optimalRegionSum;
-  // Third Key: the sum of overlapped 2-Region combination Regions with other 
allocated
+  // Second Key: the sum of overlapped 2-Region combination Regions with other 
allocated
   // RegionGroups is minimal
   int optimalCombinationSum;
   List<int[]> optimalReplicaSets;
+  private static final int MAX_OPTIMAL_PLAN_NUM = 10;
 
   private static class DataNodeEntry {
 
     private final int dataNodeId;
 
-    // First key: the number of Regions in the DataNode within the same 
Database
-    private final int databaseRegionCount;
-    // Second key: the number of Regions in the DataNode
+    // First key: the number of Regions in the DataNode
     private final int regionCount;
-    // Third key: the scatter width of the DataNode
+    // Second key: the scatter width of the DataNode
     private final int scatterWidth;
-    // Forth key: a random weight
+    // Third key: a random weight
     private final int randomWeight;
 
-    public DataNodeEntry(
-        int dataNodeId, int databaseRegionCount, int regionCount, int 
scatterWidth) {
+    public DataNodeEntry(int dataNodeId, int regionCount, int scatterWidth) {
       this.dataNodeId = dataNodeId;
-      this.databaseRegionCount = databaseRegionCount;
       this.regionCount = regionCount;
       this.scatterWidth = scatterWidth;
       this.randomWeight = RANDOM.nextInt();
@@ -91,13 +79,11 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
     }
 
     public int compare(DataNodeEntry e) {
-      return databaseRegionCount != e.databaseRegionCount
-          ? Integer.compare(databaseRegionCount, e.databaseRegionCount)
-          : regionCount != e.regionCount
-              ? Integer.compare(regionCount, e.regionCount)
-              : scatterWidth != e.scatterWidth
-                  ? Integer.compare(scatterWidth, e.scatterWidth)
-                  : Integer.compare(randomWeight, e.randomWeight);
+      return regionCount != e.regionCount
+          ? Integer.compare(regionCount, e.regionCount)
+          : scatterWidth != e.scatterWidth
+              ? Integer.compare(scatterWidth, e.scatterWidth)
+              : Integer.compare(randomWeight, e.randomWeight);
     }
   }
 
@@ -114,12 +100,8 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
       int replicationFactor,
       TConsensusGroupId consensusGroupId) {
     try {
-      prepare(
-          replicationFactor,
-          availableDataNodeMap,
-          allocatedRegionGroups,
-          databaseAllocatedRegionGroups);
-      dfs(-1, 0, new int[replicationFactor], 0, 0);
+      prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+      dfs(-1, 0, new int[replicationFactor], 0);
 
       // Randomly pick one optimal plan as result
       Collections.shuffle(optimalReplicaSets);
@@ -129,7 +111,6 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
       for (int i = 0; i < replicationFactor; i++) {
         
result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation());
       }
-
       return result;
     } finally {
       clear();
@@ -142,13 +123,11 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
    * @param replicationFactor replication factor in the cluster
    * @param availableDataNodeMap currently available DataNodes, ensure size() 
>= replicationFactor
    * @param allocatedRegionGroups already allocated RegionGroups in the cluster
-   * @param databaseAllocatedRegionGroups already allocated RegionGroups in 
the same Database
    */
   private void prepare(
       int replicationFactor,
       Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
-      List<TRegionReplicaSet> allocatedRegionGroups,
-      List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
+      List<TRegionReplicaSet> allocatedRegionGroups) {
 
     this.replicationFactor = replicationFactor;
     // Store the maximum DataNodeId
@@ -160,13 +139,10 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
                 .mapToInt(TDataNodeLocation::getDataNodeId)
                 .max()
                 .orElse(0));
-    this.maxDataNodeId = maxDataNodeId;
 
-    // Compute regionCounter, databaseRegionCounter and combinationCounter
+    // Compute regionCounter and combinationCounter
     regionCounter = new int[maxDataNodeId + 1];
     Arrays.fill(regionCounter, 0);
-    databaseRegionCounter = new int[maxDataNodeId + 1];
-    Arrays.fill(databaseRegionCounter, 0);
     combinationCounter = new int[maxDataNodeId + 1][maxDataNodeId + 1];
     for (int i = 0; i <= maxDataNodeId; i++) {
       Arrays.fill(combinationCounter[i], 0);
@@ -183,12 +159,6 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
         }
       }
     }
-    for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups) {
-      List<TDataNodeLocation> dataNodeLocations = 
regionReplicaSet.getDataNodeLocations();
-      for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
-        databaseRegionCounter[dataNodeLocation.getDataNodeId()]++;
-      }
-    }
 
     // Compute the DataNodeIds through sorting the DataNodeEntryMap
     Map<Integer, DataNodeEntry> dataNodeEntryMap = new HashMap<>(maxDataNodeId 
+ 1);
@@ -206,11 +176,7 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
               }
               dataNodeEntryMap.put(
                   dataNodeId,
-                  new DataNodeEntry(
-                      dataNodeId,
-                      databaseRegionCounter[dataNodeId],
-                      regionCounter[dataNodeId],
-                      scatterWidth));
+                  new DataNodeEntry(dataNodeId, regionCounter[dataNodeId], 
scatterWidth));
             });
     dataNodeIds =
         dataNodeEntryMap.entrySet().stream()
@@ -222,7 +188,6 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
             .toArray();
 
     // Reset the optimal result
-    optimalDatabaseRegionSum = Integer.MAX_VALUE;
     optimalRegionSum = Integer.MAX_VALUE;
     optimalCombinationSum = Integer.MAX_VALUE;
     optimalReplicaSets = new ArrayList<>();
@@ -236,27 +201,14 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
    * @param lastIndex last decided index in dataNodeIds
    * @param currentReplica current replica index
    * @param currentReplicaSet current allocation plan
-   * @param databaseRegionSum the sum of Regions at the DataNodes within the 
same Database in the
-   *     current allocation plan
    * @param regionSum the sum of Regions at the DataNodes in the current 
allocation plan
    */
-  private void dfs(
-      int lastIndex,
-      int currentReplica,
-      int[] currentReplicaSet,
-      int databaseRegionSum,
-      int regionSum) {
-    if (databaseRegionSum > optimalDatabaseRegionSum) {
+  private void dfs(int lastIndex, int currentReplica, int[] currentReplicaSet, 
int regionSum) {
+    if (regionSum > optimalRegionSum) {
       // Pruning: no needs for further searching when the first key
       // is bigger than the historical optimal result
       return;
     }
-    if (databaseRegionSum == optimalDatabaseRegionSum && regionSum > 
optimalRegionSum) {
-      // Pruning: no needs for further searching when the first key is equal 
to the historical
-      // optimal result
-      // and the second key is bigger than the historical optimal result
-      return;
-    }
 
     if (currentReplica == replicationFactor) {
       // A complete allocation plan is found
@@ -266,17 +218,14 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
           combinationSum += 
combinationCounter[currentReplicaSet[i]][currentReplicaSet[j]];
         }
       }
-      if (databaseRegionSum == optimalDatabaseRegionSum
-          && regionSum == optimalRegionSum
-          && combinationSum > optimalCombinationSum) {
+      if (combinationSum > optimalCombinationSum) {
+        // Pruning: no needs for further searching when the second key
+        // is bigger than the historical optimal result
         return;
       }
 
-      if (databaseRegionSum < optimalDatabaseRegionSum
-          || regionSum < optimalRegionSum
-          || combinationSum < optimalCombinationSum) {
+      if (regionSum < optimalRegionSum || combinationSum < 
optimalCombinationSum) {
         // Reset the optimal result when a better one is found
-        optimalDatabaseRegionSum = databaseRegionSum;
         optimalRegionSum = regionSum;
         optimalCombinationSum = combinationSum;
         optimalReplicaSets.clear();
@@ -288,13 +237,8 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
     for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
       // Decide the next DataNodeId in the allocation plan
       currentReplicaSet[currentReplica] = dataNodeIds[i];
-      dfs(
-          i,
-          currentReplica + 1,
-          currentReplicaSet,
-          databaseRegionSum + databaseRegionCounter[dataNodeIds[i]],
-          regionSum + regionCounter[dataNodeIds[i]]);
-      if (optimalReplicaSets.size() == GCR_MAX_OPTIMAL_PLAN_NUM) {
+      dfs(i, currentReplica + 1, currentReplicaSet, regionSum + 
regionCounter[dataNodeIds[i]]);
+      if (optimalReplicaSets.size() == MAX_OPTIMAL_PLAN_NUM) {
         // Pruning: no needs for further searching when
         // the number of optimal plans reaches the limitation
         return;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 15618626e6d..4a28fb6ca31 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 517f35a569f..3bfb76dd14c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -25,14 +25,15 @@ import org.apache.iotdb.commons.utils.TestOnly;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Leader distribution balancer that uses minimum cost flow algorithm */
 public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
@@ -40,9 +41,8 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private static final int INFINITY = Integer.MAX_VALUE;
 
   /** Input parameters */
-  private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
-
   private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+
   private final Map<TConsensusGroupId, Integer> regionLeaderMap;
   private final Set<Integer> disabledDataNodeSet;
 
@@ -55,12 +55,10 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private int maxNode = T_NODE + 1;
   // Map<RegionGroupId, rNode>
   private final Map<TConsensusGroupId, Integer> rNodeMap;
-  // Map<Database, Map<DataNodeId, sDNode>>
-  private final Map<String, Map<Integer, Integer>> sDNodeMap;
-  // Map<Database, Map<sDNode, DataNodeId>>
-  private final Map<String, Map<Integer, Integer>> sDNodeReflect;
-  // Map<DataNodeId, tDNode>
-  private final Map<Integer, Integer> tDNodeMap;
+  // Map<DataNodeId, dNode>
+  private final Map<Integer, Integer> dNodeMap;
+  // Map<dNode, DataNodeId>
+  private final Map<Integer, Integer> dNodeReflect;
 
   /** Graph edges */
   // Maximum index of graph edges
@@ -77,14 +75,12 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private int minimumCost = 0;
 
   public MinCostFlowLeaderBalancer() {
-    this.databaseRegionGroupMap = new TreeMap<>();
-    this.regionReplicaSetMap = new TreeMap<>();
-    this.regionLeaderMap = new TreeMap<>();
-    this.disabledDataNodeSet = new TreeSet<>();
-    this.rNodeMap = new TreeMap<>();
-    this.sDNodeMap = new TreeMap<>();
-    this.sDNodeReflect = new TreeMap<>();
-    this.tDNodeMap = new TreeMap<>();
+    this.regionReplicaSetMap = new HashMap<>();
+    this.regionLeaderMap = new HashMap<>();
+    this.disabledDataNodeSet = new HashSet<>();
+    this.rNodeMap = new HashMap<>();
+    this.dNodeMap = new HashMap<>();
+    this.dNodeReflect = new HashMap<>();
     this.minCostFlowEdges = new ArrayList<>();
   }
 
@@ -95,7 +91,7 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet) {
 
-    initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, 
disabledDataNodeSet);
+    initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
 
     Map<TConsensusGroupId, Integer> result;
     constructMCFGraph();
@@ -107,26 +103,21 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   }
 
   private void initialize(
-      Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
       Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
       Map<TConsensusGroupId, Integer> regionLeaderMap,
       Set<Integer> disabledDataNodeSet) {
-    this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
     this.regionReplicaSetMap.putAll(regionReplicaSetMap);
     this.regionLeaderMap.putAll(regionLeaderMap);
     this.disabledDataNodeSet.addAll(disabledDataNodeSet);
   }
 
   private void clear() {
-    this.databaseRegionGroupMap.clear();
     this.regionReplicaSetMap.clear();
     this.regionLeaderMap.clear();
     this.disabledDataNodeSet.clear();
-
     this.rNodeMap.clear();
-    this.sDNodeMap.clear();
-    this.sDNodeReflect.clear();
-    this.tDNodeMap.clear();
+    this.dNodeMap.clear();
+    this.dNodeReflect.clear();
     this.minCostFlowEdges.clear();
 
     this.nodeHeadEdge = null;
@@ -143,30 +134,13 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
     this.minimumCost = 0;
 
     /* Indicate nodes in mcf */
-    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
-        databaseRegionGroupMap.entrySet()) {
-      String database = databaseEntry.getKey();
-      sDNodeMap.put(database, new TreeMap<>());
-      sDNodeReflect.put(database, new TreeMap<>());
-      List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
-      for (TConsensusGroupId regionGroupId : regionGroupIds) {
-        rNodeMap.put(regionGroupId, maxNode++);
-        for (TDataNodeLocation dataNodeLocation :
-            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
-          int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
-          }
-          if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
-            sDNodeMap.get(database).put(dataNodeId, maxNode);
-            sDNodeReflect.get(database).put(maxNode, dataNodeId);
-            maxNode += 1;
-          }
-          if (!tDNodeMap.containsKey(dataNodeId)) {
-            tDNodeMap.put(dataNodeId, maxNode);
-            maxNode += 1;
-          }
+    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+      rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
+          dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
+          dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
+          maxNode += 1;
         }
       }
     }
@@ -180,74 +154,57 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
 
     /* Construct edges: sNode -> rNodes */
     for (int rNode : rNodeMap.values()) {
-      // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
+      // Cost: 0
       addAdjacentEdges(S_NODE, rNode, 1, 0);
     }
 
-    /* Construct edges: rNodes -> sdNodes */
-    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
-        databaseRegionGroupMap.entrySet()) {
-      String database = databaseEntry.getKey();
-      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
-        int rNode = rNodeMap.get(regionGroupId);
-        for (TDataNodeLocation dataNodeLocation :
-            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
-          int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
-          }
-          int sDNode = sDNodeMap.get(database).get(dataNodeId);
-          // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
-          // Therefore, the RegionGroup will keep the leader as constant as 
possible.
-          int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
-          addAdjacentEdges(rNode, sDNode, 1, cost);
-        }
+    /* Construct edges: rNodes -> dNodes */
+    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+      int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
+        // Cost: 1 if the dNode is corresponded to the current leader of the 
rNode,
+        //       0 otherwise.
+        // Therefore, the RegionGroup will keep the leader as constant as 
possible.
+        int cost =
+            regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
+                    == dataNodeLocation.getDataNodeId()
+                ? 0
+                : 1;
+        addAdjacentEdges(rNode, dNode, 1, cost);
       }
     }
 
-    /* Construct edges: sDNodes -> tDNodes */
-    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
-        databaseRegionGroupMap.entrySet()) {
-      String database = databaseEntry.getKey();
-      // Map<DataNodeId, leader number>
-      Map<Integer, Integer> leaderCounter = new TreeMap<>();
-      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
-        for (TDataNodeLocation dataNodeLocation :
-            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
-          int dataNodeId = dataNodeLocation.getDataNodeId();
-          if (disabledDataNodeSet.contains(dataNodeId)) {
-            // Skip disabled DataNode
-            continue;
-          }
-          int sDNode = sDNodeMap.get(database).get(dataNodeId);
-          int tDNode = tDNodeMap.get(dataNodeId);
-          int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
-          // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
-          // Thus, the leader distribution will be as balance as possible 
within each Database
-          // based on the Jensen's-Inequality.
-          addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
-        }
+    /* Construct edges: dNodes -> tNode */
+    // Count the possible maximum number of leader in each DataNode
+    Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
+    regionReplicaSetMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                regionReplicaSet
+                    .getDataNodeLocations()
+                    .forEach(
+                        dataNodeLocation ->
+                            maxLeaderCounter
+                                .computeIfAbsent(
+                                    dataNodeLocation.getDataNodeId(), empty -> 
new AtomicInteger(0))
+                                .getAndIncrement()));
+
+    for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
+      int dataNodeId = dNodeEntry.getKey();
+      int dNode = dNodeEntry.getValue();
+
+      if (disabledDataNodeSet.contains(dataNodeId)) {
+        // Skip disabled DataNode
+        continue;
       }
-    }
 
-    /* Construct edges: tDNodes -> tNode */
-    // Map<DataNodeId, possible max leader> Count the possible maximum number 
of leader in each
-    // DataNode
-    Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
-    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
-        int dataNodeId = dataNodeLocation.getDataNodeId();
-        if (disabledDataNodeSet.contains(dataNodeId)) {
-          // Skip disabled DataNode
-          continue;
-        }
-        int tDNode = tDNodeMap.get(dataNodeId);
-        int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
+      int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
+      for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
         // Cost: x^2 for the x-th edge at the current dNode.
-        // Thus, the leader distribution will be as balance as possible within 
the cluster
-        // Based on the Jensen's-Inequality.
-        addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
+        // Thus, the leader distribution will be as balance as possible.
+        addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge);
       }
     }
   }
@@ -354,24 +311,22 @@ public class MinCostFlowLeaderBalancer implements 
ILeaderBalancer {
   private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
     Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
 
-    databaseRegionGroupMap.forEach(
-        (database, regionGroupIds) ->
-            regionGroupIds.forEach(
-                regionGroupId -> {
-                  boolean matchLeader = false;
-                  for (int currentEdge = 
nodeHeadEdge[rNodeMap.get(regionGroupId)];
-                      currentEdge >= 0;
-                      currentEdge = 
minCostFlowEdges.get(currentEdge).nextEdge) {
-                    MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
-                    if (edge.destNode != S_NODE && edge.capacity == 0) {
-                      matchLeader = true;
-                      result.put(regionGroupId, 
sDNodeReflect.get(database).get(edge.destNode));
-                    }
-                  }
-                  if (!matchLeader) {
-                    result.put(regionGroupId, 
regionLeaderMap.getOrDefault(regionGroupId, -1));
-                  }
-                }));
+    rNodeMap.forEach(
+        (regionGroupId, rNode) -> {
+          boolean matchLeader = false;
+          for (int currentEdge = nodeHeadEdge[rNode];
+              currentEdge >= 0;
+              currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+            MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
+            if (edge.destNode != S_NODE && edge.capacity == 0) {
+              matchLeader = true;
+              result.put(regionGroupId, dNodeReflect.get(edge.destNode));
+            }
+          }
+          if (!matchLeader) {
+            result.put(regionGroupId, 
regionLeaderMap.getOrDefault(regionGroupId, -1));
+          }
+        });
 
     return result;
   }

Reply via email to