This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch region-multi-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region-multi-database by this
push:
new 47cdc2bf260 update algorithm
47cdc2bf260 is described below
commit 47cdc2bf260acb4d584611d7b3fc2fc5fe8f45d6
Author: YongzaoDan <[email protected]>
AuthorDate: Sun Mar 17 16:11:15 2024 +0800
update algorithm
---
.../confignode/conf/ConfigNodeDescriptor.java | 3 +-
.../region/GreedyCopySetRegionGroupAllocator.java | 106 +++--------
.../router/leader/GreedyLeaderBalancer.java | 1 -
.../router/leader/MinCostFlowLeaderBalancer.java | 209 ++++++++-------------
4 files changed, 108 insertions(+), 211 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index f138143ff9b..38a52154f8a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -359,8 +359,7 @@ public class ConfigNodeDescriptor {
} else {
throw new IOException(
String.format(
- "Unknown leader_distribution_policy: %s, "
- + "please set to \"GREEDY\" or \"CFD\"",
+ "Unknown leader_distribution_policy: %s, " + "please set to
\"GREEDY\" or \"CFD\"",
leaderDistributionPolicy));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index 4623c9469f3..22dfa1d339b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import java.util.ArrayList;
import java.util.Arrays;
@@ -40,47 +39,36 @@ import static java.util.Map.Entry.comparingByValue;
public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator {
private static final Random RANDOM = new Random();
- private static final int GCR_MAX_OPTIMAL_PLAN_NUM =
- ConfigNodeDescriptor.getInstance().getConf().getGcrMaxOptimalPlanNum();
private int replicationFactor;
// Sorted available DataNodeIds
private int[] dataNodeIds;
// The number of allocated Regions in each DataNode
private int[] regionCounter;
- // The number of allocated Regions in each DataNode within the same Database
- private int[] databaseRegionCounter;
// The number of 2-Region combinations in current cluster
private int[][] combinationCounter;
- private int maxDataNodeId;
- // First Key: the sum of Regions at the DataNodes within the same Database
in the allocation
- // result is minimal
- int optimalDatabaseRegionSum;
- // Second Key: the sum of Regions at the DataNodes in the allocation result
is minimal
+ // First Key: the sum of Regions at the DataNodes in the allocation result
is minimal
int optimalRegionSum;
- // Third Key: the sum of overlapped 2-Region combination Regions with other
allocated
+ // Second Key: the sum of overlapped 2-Region combination Regions with other
allocated
// RegionGroups is minimal
int optimalCombinationSum;
List<int[]> optimalReplicaSets;
+ private static final int MAX_OPTIMAL_PLAN_NUM = 10;
private static class DataNodeEntry {
private final int dataNodeId;
- // First key: the number of Regions in the DataNode within the same
Database
- private final int databaseRegionCount;
- // Second key: the number of Regions in the DataNode
+ // First key: the number of Regions in the DataNode
private final int regionCount;
- // Third key: the scatter width of the DataNode
+ // Second key: the scatter width of the DataNode
private final int scatterWidth;
- // Forth key: a random weight
+ // Third key: a random weight
private final int randomWeight;
- public DataNodeEntry(
- int dataNodeId, int databaseRegionCount, int regionCount, int
scatterWidth) {
+ public DataNodeEntry(int dataNodeId, int regionCount, int scatterWidth) {
this.dataNodeId = dataNodeId;
- this.databaseRegionCount = databaseRegionCount;
this.regionCount = regionCount;
this.scatterWidth = scatterWidth;
this.randomWeight = RANDOM.nextInt();
@@ -91,13 +79,11 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
}
public int compare(DataNodeEntry e) {
- return databaseRegionCount != e.databaseRegionCount
- ? Integer.compare(databaseRegionCount, e.databaseRegionCount)
- : regionCount != e.regionCount
- ? Integer.compare(regionCount, e.regionCount)
- : scatterWidth != e.scatterWidth
- ? Integer.compare(scatterWidth, e.scatterWidth)
- : Integer.compare(randomWeight, e.randomWeight);
+ return regionCount != e.regionCount
+ ? Integer.compare(regionCount, e.regionCount)
+ : scatterWidth != e.scatterWidth
+ ? Integer.compare(scatterWidth, e.scatterWidth)
+ : Integer.compare(randomWeight, e.randomWeight);
}
}
@@ -114,12 +100,8 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
int replicationFactor,
TConsensusGroupId consensusGroupId) {
try {
- prepare(
- replicationFactor,
- availableDataNodeMap,
- allocatedRegionGroups,
- databaseAllocatedRegionGroups);
- dfs(-1, 0, new int[replicationFactor], 0, 0);
+ prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+ dfs(-1, 0, new int[replicationFactor], 0);
// Randomly pick one optimal plan as result
Collections.shuffle(optimalReplicaSets);
@@ -129,7 +111,6 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
for (int i = 0; i < replicationFactor; i++) {
result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation());
}
-
return result;
} finally {
clear();
@@ -142,13 +123,11 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* @param replicationFactor replication factor in the cluster
* @param availableDataNodeMap currently available DataNodes, ensure size()
>= replicationFactor
* @param allocatedRegionGroups already allocated RegionGroups in the cluster
- * @param databaseAllocatedRegionGroups already allocated RegionGroups in
the same Database
*/
private void prepare(
int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
- List<TRegionReplicaSet> allocatedRegionGroups,
- List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
+ List<TRegionReplicaSet> allocatedRegionGroups) {
this.replicationFactor = replicationFactor;
// Store the maximum DataNodeId
@@ -160,13 +139,10 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
.mapToInt(TDataNodeLocation::getDataNodeId)
.max()
.orElse(0));
- this.maxDataNodeId = maxDataNodeId;
- // Compute regionCounter, databaseRegionCounter and combinationCounter
+ // Compute regionCounter and combinationCounter
regionCounter = new int[maxDataNodeId + 1];
Arrays.fill(regionCounter, 0);
- databaseRegionCounter = new int[maxDataNodeId + 1];
- Arrays.fill(databaseRegionCounter, 0);
combinationCounter = new int[maxDataNodeId + 1][maxDataNodeId + 1];
for (int i = 0; i <= maxDataNodeId; i++) {
Arrays.fill(combinationCounter[i], 0);
@@ -183,12 +159,6 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
}
}
}
- for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups) {
- List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet.getDataNodeLocations();
- for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
- databaseRegionCounter[dataNodeLocation.getDataNodeId()]++;
- }
- }
// Compute the DataNodeIds through sorting the DataNodeEntryMap
Map<Integer, DataNodeEntry> dataNodeEntryMap = new HashMap<>(maxDataNodeId
+ 1);
@@ -206,11 +176,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
}
dataNodeEntryMap.put(
dataNodeId,
- new DataNodeEntry(
- dataNodeId,
- databaseRegionCounter[dataNodeId],
- regionCounter[dataNodeId],
- scatterWidth));
+ new DataNodeEntry(dataNodeId, regionCounter[dataNodeId],
scatterWidth));
});
dataNodeIds =
dataNodeEntryMap.entrySet().stream()
@@ -222,7 +188,6 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
.toArray();
// Reset the optimal result
- optimalDatabaseRegionSum = Integer.MAX_VALUE;
optimalRegionSum = Integer.MAX_VALUE;
optimalCombinationSum = Integer.MAX_VALUE;
optimalReplicaSets = new ArrayList<>();
@@ -236,27 +201,14 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* @param lastIndex last decided index in dataNodeIds
* @param currentReplica current replica index
* @param currentReplicaSet current allocation plan
- * @param databaseRegionSum the sum of Regions at the DataNodes within the
same Database in the
- * current allocation plan
* @param regionSum the sum of Regions at the DataNodes in the current
allocation plan
*/
- private void dfs(
- int lastIndex,
- int currentReplica,
- int[] currentReplicaSet,
- int databaseRegionSum,
- int regionSum) {
- if (databaseRegionSum > optimalDatabaseRegionSum) {
+ private void dfs(int lastIndex, int currentReplica, int[] currentReplicaSet,
int regionSum) {
+ if (regionSum > optimalRegionSum) {
// Pruning: no needs for further searching when the first key
// is bigger than the historical optimal result
return;
}
- if (databaseRegionSum == optimalDatabaseRegionSum && regionSum >
optimalRegionSum) {
- // Pruning: no needs for further searching when the first key is equal
to the historical
- // optimal result
- // and the second key is bigger than the historical optimal result
- return;
- }
if (currentReplica == replicationFactor) {
// A complete allocation plan is found
@@ -266,17 +218,14 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
combinationSum +=
combinationCounter[currentReplicaSet[i]][currentReplicaSet[j]];
}
}
- if (databaseRegionSum == optimalDatabaseRegionSum
- && regionSum == optimalRegionSum
- && combinationSum > optimalCombinationSum) {
+ if (combinationSum > optimalCombinationSum) {
+ // Pruning: no needs for further searching when the second key
+ // is bigger than the historical optimal result
return;
}
- if (databaseRegionSum < optimalDatabaseRegionSum
- || regionSum < optimalRegionSum
- || combinationSum < optimalCombinationSum) {
+ if (regionSum < optimalRegionSum || combinationSum <
optimalCombinationSum) {
// Reset the optimal result when a better one is found
- optimalDatabaseRegionSum = databaseRegionSum;
optimalRegionSum = regionSum;
optimalCombinationSum = combinationSum;
optimalReplicaSets.clear();
@@ -288,13 +237,8 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
for (int i = lastIndex + 1; i < dataNodeIds.length; i++) {
// Decide the next DataNodeId in the allocation plan
currentReplicaSet[currentReplica] = dataNodeIds[i];
- dfs(
- i,
- currentReplica + 1,
- currentReplicaSet,
- databaseRegionSum + databaseRegionCounter[dataNodeIds[i]],
- regionSum + regionCounter[dataNodeIds[i]]);
- if (optimalReplicaSets.size() == GCR_MAX_OPTIMAL_PLAN_NUM) {
+ dfs(i, currentReplica + 1, currentReplicaSet, regionSum +
regionCounter[dataNodeIds[i]]);
+ if (optimalReplicaSets.size() == MAX_OPTIMAL_PLAN_NUM) {
// Pruning: no needs for further searching when
// the number of optimal plans reaches the limitation
return;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 15618626e6d..4a28fb6ca31 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index 517f35a569f..3bfb76dd14c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -25,14 +25,15 @@ import org.apache.iotdb.commons.utils.TestOnly;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/** Leader distribution balancer that uses minimum cost flow algorithm */
public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
@@ -40,9 +41,8 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private static final int INFINITY = Integer.MAX_VALUE;
/** Input parameters */
- private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap;
-
private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
+
private final Map<TConsensusGroupId, Integer> regionLeaderMap;
private final Set<Integer> disabledDataNodeSet;
@@ -55,12 +55,10 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int maxNode = T_NODE + 1;
// Map<RegionGroupId, rNode>
private final Map<TConsensusGroupId, Integer> rNodeMap;
- // Map<Database, Map<DataNodeId, sDNode>>
- private final Map<String, Map<Integer, Integer>> sDNodeMap;
- // Map<Database, Map<sDNode, DataNodeId>>
- private final Map<String, Map<Integer, Integer>> sDNodeReflect;
- // Map<DataNodeId, tDNode>
- private final Map<Integer, Integer> tDNodeMap;
+ // Map<DataNodeId, dNode>
+ private final Map<Integer, Integer> dNodeMap;
+ // Map<dNode, DataNodeId>
+ private final Map<Integer, Integer> dNodeReflect;
/** Graph edges */
// Maximum index of graph edges
@@ -77,14 +75,12 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private int minimumCost = 0;
public MinCostFlowLeaderBalancer() {
- this.databaseRegionGroupMap = new TreeMap<>();
- this.regionReplicaSetMap = new TreeMap<>();
- this.regionLeaderMap = new TreeMap<>();
- this.disabledDataNodeSet = new TreeSet<>();
- this.rNodeMap = new TreeMap<>();
- this.sDNodeMap = new TreeMap<>();
- this.sDNodeReflect = new TreeMap<>();
- this.tDNodeMap = new TreeMap<>();
+ this.regionReplicaSetMap = new HashMap<>();
+ this.regionLeaderMap = new HashMap<>();
+ this.disabledDataNodeSet = new HashSet<>();
+ this.rNodeMap = new HashMap<>();
+ this.dNodeMap = new HashMap<>();
+ this.dNodeReflect = new HashMap<>();
this.minCostFlowEdges = new ArrayList<>();
}
@@ -95,7 +91,7 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
- initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap,
disabledDataNodeSet);
+ initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet);
Map<TConsensusGroupId, Integer> result;
constructMCFGraph();
@@ -107,26 +103,21 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
}
private void initialize(
- Map<String, List<TConsensusGroupId>> databaseRegionGroupMap,
Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Set<Integer> disabledDataNodeSet) {
- this.databaseRegionGroupMap.putAll(databaseRegionGroupMap);
this.regionReplicaSetMap.putAll(regionReplicaSetMap);
this.regionLeaderMap.putAll(regionLeaderMap);
this.disabledDataNodeSet.addAll(disabledDataNodeSet);
}
private void clear() {
- this.databaseRegionGroupMap.clear();
this.regionReplicaSetMap.clear();
this.regionLeaderMap.clear();
this.disabledDataNodeSet.clear();
-
this.rNodeMap.clear();
- this.sDNodeMap.clear();
- this.sDNodeReflect.clear();
- this.tDNodeMap.clear();
+ this.dNodeMap.clear();
+ this.dNodeReflect.clear();
this.minCostFlowEdges.clear();
this.nodeHeadEdge = null;
@@ -143,30 +134,13 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
this.minimumCost = 0;
/* Indicate nodes in mcf */
- for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
- databaseRegionGroupMap.entrySet()) {
- String database = databaseEntry.getKey();
- sDNodeMap.put(database, new TreeMap<>());
- sDNodeReflect.put(database, new TreeMap<>());
- List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue();
- for (TConsensusGroupId regionGroupId : regionGroupIds) {
- rNodeMap.put(regionGroupId, maxNode++);
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
- }
- if (!sDNodeMap.get(database).containsKey(dataNodeId)) {
- sDNodeMap.get(database).put(dataNodeId, maxNode);
- sDNodeReflect.get(database).put(maxNode, dataNodeId);
- maxNode += 1;
- }
- if (!tDNodeMap.containsKey(dataNodeId)) {
- tDNodeMap.put(dataNodeId, maxNode);
- maxNode += 1;
- }
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++);
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) {
+ dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode);
+ dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId());
+ maxNode += 1;
}
}
}
@@ -180,74 +154,57 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
/* Construct edges: sNode -> rNodes */
for (int rNode : rNodeMap.values()) {
- // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
+ // Cost: 0
addAdjacentEdges(S_NODE, rNode, 1, 0);
}
- /* Construct edges: rNodes -> sdNodes */
- for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
- databaseRegionGroupMap.entrySet()) {
- String database = databaseEntry.getKey();
- for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
- int rNode = rNodeMap.get(regionGroupId);
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
- }
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
- addAdjacentEdges(rNode, sDNode, 1, cost);
- }
+ /* Construct edges: rNodes -> dNodes */
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
+ int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
+ // Cost: 1 if the dNode is corresponded to the current leader of the
rNode,
+ // 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost =
+ regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
+ == dataNodeLocation.getDataNodeId()
+ ? 0
+ : 1;
+ addAdjacentEdges(rNode, dNode, 1, cost);
}
}
- /* Construct edges: sDNodes -> tDNodes */
- for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
- databaseRegionGroupMap.entrySet()) {
- String database = databaseEntry.getKey();
- // Map<DataNodeId, leader number>
- Map<Integer, Integer> leaderCounter = new TreeMap<>();
- for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
- for (TDataNodeLocation dataNodeLocation :
- regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
- }
- int sDNode = sDNodeMap.get(database).get(dataNodeId);
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
- // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
- // Thus, the leader distribution will be as balance as possible
within each Database
- // based on the Jensen's-Inequality.
- addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
- }
+ /* Construct edges: dNodes -> tNode */
+ // Count the possible maximum number of leader in each DataNode
+ Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
+ regionReplicaSetMap
+ .values()
+ .forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ maxLeaderCounter
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty ->
new AtomicInteger(0))
+ .getAndIncrement()));
+
+ for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
+ int dataNodeId = dNodeEntry.getKey();
+ int dNode = dNodeEntry.getValue();
+
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
}
- }
- /* Construct edges: tDNodes -> tNode */
- // Map<DataNodeId, possible max leader> Count the possible maximum number
of leader in each
- // DataNode
- Map<Integer, Integer> maxLeaderCounter = new TreeMap<>();
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- int dataNodeId = dataNodeLocation.getDataNodeId();
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
- }
- int tDNode = tDNodeMap.get(dataNodeId);
- int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum);
+ int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
+ for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
// Cost: x^2 for the x-th edge at the current dNode.
- // Thus, the leader distribution will be as balance as possible within
the cluster
- // Based on the Jensen's-Inequality.
- addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount);
+ // Thus, the leader distribution will be as balance as possible.
+ addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge);
}
}
}
@@ -354,24 +311,22 @@ public class MinCostFlowLeaderBalancer implements
ILeaderBalancer {
private Map<TConsensusGroupId, Integer> collectLeaderDistribution() {
Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
- databaseRegionGroupMap.forEach(
- (database, regionGroupIds) ->
- regionGroupIds.forEach(
- regionGroupId -> {
- boolean matchLeader = false;
- for (int currentEdge =
nodeHeadEdge[rNodeMap.get(regionGroupId)];
- currentEdge >= 0;
- currentEdge =
minCostFlowEdges.get(currentEdge).nextEdge) {
- MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
- if (edge.destNode != S_NODE && edge.capacity == 0) {
- matchLeader = true;
- result.put(regionGroupId,
sDNodeReflect.get(database).get(edge.destNode));
- }
- }
- if (!matchLeader) {
- result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
- }
- }));
+ rNodeMap.forEach(
+ (regionGroupId, rNode) -> {
+ boolean matchLeader = false;
+ for (int currentEdge = nodeHeadEdge[rNode];
+ currentEdge >= 0;
+ currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) {
+ MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge);
+ if (edge.destNode != S_NODE && edge.capacity == 0) {
+ matchLeader = true;
+ result.put(regionGroupId, dNodeReflect.get(edge.destNode));
+ }
+ }
+ if (!matchLeader) {
+ result.put(regionGroupId,
regionLeaderMap.getOrDefault(regionGroupId, -1));
+ }
+ });
return result;
}