This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 51bad1ec88d [remove datanode] Accelerate GCR load balancing implement
(#15535)
51bad1ec88d is described below
commit 51bad1ec88d5d20aa4e84f3ccd6841fbfba75390
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Fri Jun 6 13:02:39 2025 +0800
[remove datanode] Accelerate GCR load balancing implement (#15535)
---
.../region/GreedyCopySetRegionGroupAllocator.java | 253 ++++++++++-----------
.../GreedyCopySetRemoveNodeReplicaSelectTest.java | 29 ++-
2 files changed, 148 insertions(+), 134 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index da1205c10ba..bc71bd3996e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -42,7 +41,7 @@ import static java.util.Map.Entry.comparingByValue;
public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator {
private static final Random RANDOM = new Random();
- private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100;
+ private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10;
private int replicationFactor;
// Sorted available DataNodeIds
@@ -57,14 +56,31 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
private Map<String, int[]> initialDbLoad;
// First Key: the sum of Regions at the DataNodes in the allocation result
is minimal
- int optimalRegionSum;
+ private int optimalRegionSum;
// Second Key: the sum of Regions at the DataNodes within the same Database
// in the allocation result is minimal
- int optimalDatabaseRegionSum;
+ private int optimalDatabaseRegionSum;
// Third Key: the sum of overlapped 2-Region combination Regions with
// other allocated RegionGroups is minimal
- int optimalCombinationSum;
- List<int[]> optimalReplicaSets;
+ private int optimalCombinationSum;
+ private List<int[]> optimalReplicaSets;
+
+ // Pre-calculation, scatterDelta[i][j] means the scatter increment between
region i and the old
+ // replica set when region i is placed on node j
+ private int[][] scatterDelta;
+ // For each region, the allowed candidate destination node IDs.
+ private Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap;
+ // A list of regions that need to be migrated.
+ private List<TConsensusGroupId> dfsRegionKeys;
+ // A mapping from each region identifier to its corresponding database name.
+ private Map<TConsensusGroupId, String> regionDatabaseMap;
+ // Buffer holding best assignment arrays.
+ private int[] bestAssignment;
+ // An int array holding the best metrics found so far: [maxGlobalLoad,
maxDatabaseLoad,
+ // scatterValue].
+ private int[] bestMetrics;
+ // dfsRemoveNodeReplica batch size
+ private static final int BATCH_SIZE = 12;
private static class DataNodeEntry {
@@ -146,15 +162,14 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
// availableDataNodeMap
// excluding those already in the remain replica set.
List<TConsensusGroupId> regionKeys = new
ArrayList<>(remainReplicasMap.keySet());
- Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap = new
HashMap<>();
+ allowedCandidatesMap = new HashMap<>();
+ this.regionDatabaseMap = regionDatabaseMap;
for (TConsensusGroupId regionId : regionKeys) {
TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
- Set<Integer> notAllowedNodes = new HashSet<>();
-
- // Exclude nodes already present in the remain replica set
- for (TDataNodeLocation location :
remainReplicaSet.getDataNodeLocations()) {
- notAllowedNodes.add(location.getDataNodeId());
- }
+ Set<Integer> notAllowedNodes =
+ remainReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet());
// Allowed candidates are the nodes not in the exclusion set
List<Integer> candidates =
@@ -163,12 +178,12 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
.sorted(
(a, b) -> {
int cmp = Integer.compare(regionCounter[a],
regionCounter[b]);
- if (cmp == 0) {
- cmp = Integer.compare(databaseRegionCounter[a],
databaseRegionCounter[b]);
- }
- return cmp;
+ return (cmp != 0)
+ ? cmp
+ : Integer.compare(databaseRegionCounter[a],
databaseRegionCounter[b]);
})
.collect(Collectors.toList());
+ Collections.shuffle(candidates);
// Sort candidates in ascending order of current global load
(regionCounter)
allowedCandidatesMap.put(regionId, candidates);
@@ -178,44 +193,63 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
// first)
regionKeys.sort(Comparator.comparingInt(id ->
allowedCandidatesMap.get(id).size()));
- int n = regionKeys.size();
- // Each element holds the candidate nodeId chosen for the region at that
index
- int[] currentAssignment = new int[n];
- // additionalLoad holds the number of extra regions assigned to each
node in this migration
- // solution.
- int[] additionalLoad = new int[regionCounter.length];
+ // 3. Batch DFS
+ Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
- // 3. Create a buffer for candidate solutions
- List<int[]> optimalAssignments = new ArrayList<>();
- // bestMetrics holds the best found metrics: [maxGlobalLoad,
maxDatabaseLoad, scatterValue].
- // Initialize to high values.
- int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE};
+ for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) {
+ dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE,
regionKeys.size()));
+ int batchSize = dfsRegionKeys.size();
+
+ // Initialize buffer
+ bestAssignment = new int[batchSize];
+ // bestMetrics holds the best found metrics: [maxGlobalLoad,
maxDatabaseLoad, scatterValue].
+ bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE};
+ // currentAssignment holds the candidate nodeId chosen for the region
at that index
+ int[] currentAssignment = new int[batchSize];
+ // additionalLoad holds the number of extra regions assigned to each
node in this migration
+ // solution.
+ int[] additionalLoad = new int[regionCounter.length];
+
+ scatterDelta = new int[batchSize][regionCounter.length];
+ for (int r = 0; r < batchSize; r++) {
+ TConsensusGroupId regionId = dfsRegionKeys.get(r);
+ for (int nodeId : allowedCandidatesMap.get(regionId)) {
+ int inc = 0;
+ for (TDataNodeLocation loc :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ inc += combinationCounter[nodeId][loc.getDataNodeId()];
+ }
+ scatterDelta[r][nodeId] = inc;
+ }
+ }
- dfsRemoveNodeReplica(
- 0,
- regionKeys,
- allowedCandidatesMap,
- currentAssignment,
- additionalLoad,
- optimalAssignments,
- bestMetrics,
- remainReplicasMap,
- regionDatabaseMap);
-
- // 4. Randomly select one solution from the candidate buffer
- if (optimalAssignments.isEmpty()) {
- // This should not happen if there is at least one valid assignment
- return Collections.emptyMap();
- }
- Collections.shuffle(optimalAssignments);
- int[] bestAssignment = optimalAssignments.get(0);
+ int currentMaxGlobalLoad = 0;
+ for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+ currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad,
regionCounter[nodeId]);
+ }
- // 5. Build and return the result mapping: region -> chosen destination
TDataNodeConfiguration
- Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
- for (int i = 0; i < n; i++) {
- TConsensusGroupId regionId = regionKeys.get(i);
- int chosenNodeId = bestAssignment[i];
- result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+ dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment,
additionalLoad);
+
+ if (bestMetrics[0] == Integer.MAX_VALUE) {
+ // This should not happen if there is at least one valid assignment
+ return Collections.emptyMap();
+ }
+
+ for (int i = 0; i < batchSize; i++) {
+ TConsensusGroupId regionId = dfsRegionKeys.get(i);
+ int chosenNodeId = bestAssignment[i];
+ result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+
+ regionCounter[chosenNodeId]++;
+ String db = regionDatabaseMap.get(regionId);
+ if (db != null) {
+ int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new
int[regionCounter.length]);
+ dbLoad[chosenNodeId]++;
+ }
+ for (TDataNodeLocation loc :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ combinationCounter[chosenNodeId][loc.getDataNodeId()]++;
+ combinationCounter[loc.getDataNodeId()][chosenNodeId]++;
+ }
+ }
}
return result;
} finally {
@@ -244,99 +278,64 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
*
* @param index Current DFS level, corresponding to regionKeys.get(index)
- * @param regionKeys A list of regions that need to be migrated.
- * @param allowedCandidatesMap For each region, the allowed candidate
destination node IDs.
+ * @param currentMaxGlobalLoad The maximum global load across all data nodes.
+ * @param currentScatter The scatter value for the complete assignment.
* @param currentAssignment Current partial assignment; its length equals
the number of regions.
* @param additionalLoad Extra load currently assigned to each node.
- * @param optimalAssignments Buffer holding candidate assignment arrays.
- * @param bestMetrics An int array holding the best metrics found so far:
[maxGlobalLoad,
- * maxDatabaseLoad, scatterValue].
- * @param remainReplicasMap Mapping from region to its current remain
replica set.
*/
private void dfsRemoveNodeReplica(
int index,
- List<TConsensusGroupId> regionKeys,
- Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap,
+ int currentMaxGlobalLoad,
+ int currentScatter,
int[] currentAssignment,
- int[] additionalLoad,
- List<int[]> optimalAssignments,
- int[] bestMetrics,
- Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap,
- Map<TConsensusGroupId, String> regionDatabaseMap) {
- int n = regionKeys.size();
- if (index == n) {
- // A complete assignment has been generated.
- // Compute metrics for this complete migration assignment.
-
- // Compute the scatter value for the complete assignment.
- int currentScatter = 0;
- // For each region, calculate the scatter based on the
combinationCounter among all nodes
- // in the full replica set (which includes the nodes in the remain
replica and the new
- // candidate).
- for (int r = 0; r < n; r++) {
- TConsensusGroupId regionId = regionKeys.get(r);
- for (TDataNodeLocation location :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
- int nodeA = currentAssignment[r];
- int nodeB = location.getDataNodeId();
- currentScatter += combinationCounter[nodeA][nodeB];
- }
+ int[] additionalLoad) {
+ // Compute the maximum global load and maximum database load among all
nodes that received
+ // additional load.
+ int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter,
currentAssignment);
+ // Lexicographically compare currentMetrics with bestMetrics.
+ // If currentMetrics is better than bestMetrics, update bestMetrics and
clear the candidate
+ // buffer.
+ boolean isBetter = false;
+ boolean isEqual = true;
+ for (int i = 0; i < 3; i++) {
+ if (currentMetrics[i] < bestMetrics[i]) {
+ isBetter = true;
+ isEqual = false;
+ break;
+ } else if (currentMetrics[i] > bestMetrics[i]) {
+ isEqual = false;
+ break;
}
+ }
+ if (!isBetter && !isEqual) {
+ return;
+ }
- // Compute the maximum global load and maximum database load among all
nodes that received
- // additional load.
- int[] currentMetrics =
- getCurrentMetrics(
- additionalLoad, currentScatter, regionKeys, regionDatabaseMap,
currentAssignment);
-
- // Lexicographically compare currentMetrics with bestMetrics.
- // If currentMetrics is better than bestMetrics, update bestMetrics and
clear the candidate
- // buffer.
- boolean isBetter = false;
- boolean isEqual = true;
- for (int i = 0; i < 3; i++) {
- if (currentMetrics[i] < bestMetrics[i]) {
- isBetter = true;
- isEqual = false;
- break;
- } else if (currentMetrics[i] > bestMetrics[i]) {
- isEqual = false;
- break;
- }
- }
+ if (index == dfsRegionKeys.size()) {
if (isBetter) {
bestMetrics[0] = currentMetrics[0];
bestMetrics[1] = currentMetrics[1];
bestMetrics[2] = currentMetrics[2];
- optimalAssignments.clear();
- optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
- } else if (isEqual) {
- optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
- // Prune search if we already have enough candidate solutions
- if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) {
- return;
- }
+ System.arraycopy(currentAssignment, 0, bestAssignment, 0,
dfsRegionKeys.size());
}
return;
}
// Process the region at the current index.
- TConsensusGroupId regionId = regionKeys.get(index);
+ TConsensusGroupId regionId = dfsRegionKeys.get(index);
List<Integer> candidates = allowedCandidatesMap.get(regionId);
for (Integer candidate : candidates) {
currentAssignment[index] = candidate;
+ currentScatter += scatterDelta[index][currentAssignment[index]];
additionalLoad[candidate]++;
+ int nextMaxGlobalLoad =
+ Math.max(currentMaxGlobalLoad, regionCounter[candidate] +
additionalLoad[candidate]);
+
dfsRemoveNodeReplica(
- index + 1,
- regionKeys,
- allowedCandidatesMap,
- currentAssignment,
- additionalLoad,
- optimalAssignments,
- bestMetrics,
- remainReplicasMap,
- regionDatabaseMap);
+ index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment,
additionalLoad);
// Backtrack
additionalLoad[candidate]--;
+ currentScatter -= scatterDelta[index][currentAssignment[index]];
}
}
@@ -411,20 +410,12 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
* @param additionalLoad an array representing the additional load assigned
to each node during
* migration.
* @param currentScatter the current scatter value metric.
- * @param regionKeys a list of region identifiers (TConsensusGroupId) for
which migration is being
- * computed.
- * @param regionDatabaseMap a mapping from each region identifier to its
corresponding database
- * name.
* @param currentAssignment an array where each element is the nodeId
assigned for the
* corresponding region in {@code regionKeys}.
* @return an integer array of size 3: [maxGlobalLoad,
databaseLoadSquaredSum, scatterValue].
*/
private int[] getCurrentMetrics(
- int[] additionalLoad,
- int currentScatter,
- List<TConsensusGroupId> regionKeys,
- Map<TConsensusGroupId, String> regionDatabaseMap,
- int[] currentAssignment) {
+ int[] additionalLoad, int currentScatter, int[] currentAssignment) {
int currentMaxGlobalLoad = 0;
// Calculate the maximum global load across all data nodes.
for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
@@ -433,7 +424,7 @@ public class GreedyCopySetRegionGroupAllocator implements
IRegionGroupAllocator
}
// Compute the database load squared sum using the helper method.
int dbLoadSquaredSum =
- computeDatabaseLoadSquaredSum(currentAssignment, regionKeys,
regionDatabaseMap);
+ computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys,
regionDatabaseMap);
// Build current metrics in order [maxGlobalLoad, maxDatabaseLoad,
scatterValue]
return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
index 557fba8ba7a..0dd73b77f16 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
@@ -54,7 +54,7 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
private static final int TEST_DATA_NODE_NUM = 5;
- private static final int DATA_REGION_PER_DATA_NODE = 4;
+ private static final int DATA_REGION_PER_DATA_NODE = 30;
private static final int DATA_REPLICATION_FACTOR = 2;
@@ -128,6 +128,14 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
PGPRegionCounter.put(nodeId, 0);
});
+ for (TRegionReplicaSet replicaSet : allocateResult) {
+ for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+ randomRegionCounter.put(
+ loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId())
+ 1);
+ PGPRegionCounter.put(loc.getDataNodeId(),
PGPRegionCounter.get(loc.getDataNodeId()) + 1);
+ }
+ }
+
for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
TDataNodeLocation selectedNode =
randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get();
@@ -181,22 +189,31 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) +
1);
}
+ LOGGER.info("randomRegionCount:");
+
for (Integer i : randomRegionCounter.keySet()) {
Integer value = randomRegionCounter.get(i);
randomMaxRegionCount = Math.max(randomMaxRegionCount, value);
randomMinRegionCount = Math.min(randomMinRegionCount, value);
+ LOGGER.info("{} : {}", i, value);
}
+ LOGGER.info("PGPRegionCount:");
+
for (Integer i : PGPRegionCounter.keySet()) {
Integer value = PGPRegionCounter.get(i);
PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value);
PGPMinRegionCount = Math.min(PGPMinRegionCount, value);
+ LOGGER.info("{} : {}", i, value);
}
+ LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size());
Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size());
+ LOGGER.info("randomSelectedNodeIds size: {}",
randomSelectedNodeIds.size());
Assert.assertTrue(PGPSelectedNodeIds.size() >=
randomSelectedNodeIds.size());
+ LOGGER.info(
+ "randomMaxRegionCount: {}, PGPMaxRegionCount: {}",
randomMaxRegionCount, PGPMaxRegionCount);
Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount);
- Assert.assertTrue(randomMinRegionCount <= PGPMinRegionCount);
}
@Test
@@ -274,6 +291,13 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
planCount.put(n, 0);
});
+ for (TRegionReplicaSet replicaSet : globalAllocatedList) {
+ for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+ rndCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ planCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ }
+ }
+
for (TRegionReplicaSet r : remainReplicas) {
TDataNodeLocation pick =
randomSelectNodeForRegion(r.getDataNodeLocations()).get();
LOGGER.info("Random Selected DataNode {} for Region {}",
pick.getDataNodeId(), r.regionId);
@@ -325,7 +349,6 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size());
Assert.assertTrue(planNodes.size() >= rndNodes.size());
Assert.assertTrue(rndMax >= planMax);
- Assert.assertTrue(rndMin <= planMin);
}
private Optional<TDataNodeLocation> randomSelectNodeForRegion(