This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 51bad1ec88d [remove datanode] Accelerate GCR load balancing implement 
(#15535)
51bad1ec88d is described below

commit 51bad1ec88d5d20aa4e84f3ccd6841fbfba75390
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Fri Jun 6 13:02:39 2025 +0800

    [remove datanode] Accelerate GCR load balancing implement (#15535)
---
 .../region/GreedyCopySetRegionGroupAllocator.java  | 253 ++++++++++-----------
 .../GreedyCopySetRemoveNodeReplicaSelectTest.java  |  29 ++-
 2 files changed, 148 insertions(+), 134 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
index da1205c10ba..bc71bd3996e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -42,7 +41,7 @@ import static java.util.Map.Entry.comparingByValue;
 public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator {
 
   private static final Random RANDOM = new Random();
-  private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100;
+  private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10;
 
   private int replicationFactor;
   // Sorted available DataNodeIds
@@ -57,14 +56,31 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
   private Map<String, int[]> initialDbLoad;
 
   // First Key: the sum of Regions at the DataNodes in the allocation result 
is minimal
-  int optimalRegionSum;
+  private int optimalRegionSum;
   // Second Key: the sum of Regions at the DataNodes within the same Database
   // in the allocation result is minimal
-  int optimalDatabaseRegionSum;
+  private int optimalDatabaseRegionSum;
   // Third Key: the sum of overlapped 2-Region combination Regions with
   // other allocated RegionGroups is minimal
-  int optimalCombinationSum;
-  List<int[]> optimalReplicaSets;
+  private int optimalCombinationSum;
+  private List<int[]> optimalReplicaSets;
+
+  // Pre-calculation, scatterDelta[i][j] means the scatter increment between 
region i and the old
+  // replica set when region i is placed on node j
+  private int[][] scatterDelta;
+  // For each region, the allowed candidate destination node IDs.
+  private Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap;
+  // A list of regions that need to be migrated.
+  private List<TConsensusGroupId> dfsRegionKeys;
+  // A mapping from each region identifier to its corresponding database name.
+  private Map<TConsensusGroupId, String> regionDatabaseMap;
+  // Buffer holding best assignment arrays.
+  private int[] bestAssignment;
+  // An int array holding the best metrics found so far: [maxGlobalLoad, 
maxDatabaseLoad,
+  // scatterValue].
+  private int[] bestMetrics;
+  // dfsRemoveNodeReplica batch size
+  private static final int BATCH_SIZE = 12;
 
   private static class DataNodeEntry {
 
@@ -146,15 +162,14 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
       // availableDataNodeMap
       // excluding those already in the remain replica set.
       List<TConsensusGroupId> regionKeys = new 
ArrayList<>(remainReplicasMap.keySet());
-      Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap = new 
HashMap<>();
+      allowedCandidatesMap = new HashMap<>();
+      this.regionDatabaseMap = regionDatabaseMap;
       for (TConsensusGroupId regionId : regionKeys) {
         TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
-        Set<Integer> notAllowedNodes = new HashSet<>();
-
-        // Exclude nodes already present in the remain replica set
-        for (TDataNodeLocation location : 
remainReplicaSet.getDataNodeLocations()) {
-          notAllowedNodes.add(location.getDataNodeId());
-        }
+        Set<Integer> notAllowedNodes =
+            remainReplicaSet.getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toSet());
 
         // Allowed candidates are the nodes not in the exclusion set
         List<Integer> candidates =
@@ -163,12 +178,12 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
                 .sorted(
                     (a, b) -> {
                       int cmp = Integer.compare(regionCounter[a], 
regionCounter[b]);
-                      if (cmp == 0) {
-                        cmp = Integer.compare(databaseRegionCounter[a], 
databaseRegionCounter[b]);
-                      }
-                      return cmp;
+                      return (cmp != 0)
+                          ? cmp
+                          : Integer.compare(databaseRegionCounter[a], 
databaseRegionCounter[b]);
                     })
                 .collect(Collectors.toList());
+        Collections.shuffle(candidates);
 
         // Sort candidates in ascending order of current global load 
(regionCounter)
         allowedCandidatesMap.put(regionId, candidates);
@@ -178,44 +193,63 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
       // first)
       regionKeys.sort(Comparator.comparingInt(id -> 
allowedCandidatesMap.get(id).size()));
 
-      int n = regionKeys.size();
-      // Each element holds the candidate nodeId chosen for the region at that 
index
-      int[] currentAssignment = new int[n];
-      // additionalLoad holds the number of extra regions assigned to each 
node in this migration
-      // solution.
-      int[] additionalLoad = new int[regionCounter.length];
+      // 3. Batch DFS
+      Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
 
-      // 3. Create a buffer for candidate solutions
-      List<int[]> optimalAssignments = new ArrayList<>();
-      // bestMetrics holds the best found metrics: [maxGlobalLoad, 
maxDatabaseLoad, scatterValue].
-      // Initialize to high values.
-      int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE};
+      for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) {
+        dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, 
regionKeys.size()));
+        int batchSize = dfsRegionKeys.size();
+
+        // Initialize buffer
+        bestAssignment = new int[batchSize];
+        // bestMetrics holds the best found metrics: [maxGlobalLoad, 
maxDatabaseLoad, scatterValue].
+        bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE};
+        // currentAssignment holds the candidate nodeId chosen for the region 
at that index
+        int[] currentAssignment = new int[batchSize];
+        // additionalLoad holds the number of extra regions assigned to each 
node in this migration
+        // solution.
+        int[] additionalLoad = new int[regionCounter.length];
+
+        scatterDelta = new int[batchSize][regionCounter.length];
+        for (int r = 0; r < batchSize; r++) {
+          TConsensusGroupId regionId = dfsRegionKeys.get(r);
+          for (int nodeId : allowedCandidatesMap.get(regionId)) {
+            int inc = 0;
+            for (TDataNodeLocation loc : 
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+              inc += combinationCounter[nodeId][loc.getDataNodeId()];
+            }
+            scatterDelta[r][nodeId] = inc;
+          }
+        }
 
-      dfsRemoveNodeReplica(
-          0,
-          regionKeys,
-          allowedCandidatesMap,
-          currentAssignment,
-          additionalLoad,
-          optimalAssignments,
-          bestMetrics,
-          remainReplicasMap,
-          regionDatabaseMap);
-
-      // 4. Randomly select one solution from the candidate buffer
-      if (optimalAssignments.isEmpty()) {
-        // This should not happen if there is at least one valid assignment
-        return Collections.emptyMap();
-      }
-      Collections.shuffle(optimalAssignments);
-      int[] bestAssignment = optimalAssignments.get(0);
+        int currentMaxGlobalLoad = 0;
+        for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) {
+          currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, 
regionCounter[nodeId]);
+        }
 
-      // 5. Build and return the result mapping: region -> chosen destination 
TDataNodeConfiguration
-      Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
-      for (int i = 0; i < n; i++) {
-        TConsensusGroupId regionId = regionKeys.get(i);
-        int chosenNodeId = bestAssignment[i];
-        result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+        dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment, 
additionalLoad);
+
+        if (bestMetrics[0] == Integer.MAX_VALUE) {
+          // This should not happen if there is at least one valid assignment
+          return Collections.emptyMap();
+        }
+
+        for (int i = 0; i < batchSize; i++) {
+          TConsensusGroupId regionId = dfsRegionKeys.get(i);
+          int chosenNodeId = bestAssignment[i];
+          result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+
+          regionCounter[chosenNodeId]++;
+          String db = regionDatabaseMap.get(regionId);
+          if (db != null) {
+            int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new 
int[regionCounter.length]);
+            dbLoad[chosenNodeId]++;
+          }
+          for (TDataNodeLocation loc : 
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+            combinationCounter[chosenNodeId][loc.getDataNodeId()]++;
+            combinationCounter[loc.getDataNodeId()][chosenNodeId]++;
+          }
+        }
       }
       return result;
     } finally {
@@ -244,99 +278,64 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
    * <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
    *
    * @param index Current DFS level, corresponding to regionKeys.get(index)
-   * @param regionKeys A list of regions that need to be migrated.
-   * @param allowedCandidatesMap For each region, the allowed candidate 
destination node IDs.
+   * @param currentMaxGlobalLoad The maximum global load across all data nodes.
+   * @param currentScatter The scatter value for the complete assignment.
    * @param currentAssignment Current partial assignment; its length equals 
the number of regions.
    * @param additionalLoad Extra load currently assigned to each node.
-   * @param optimalAssignments Buffer holding candidate assignment arrays.
-   * @param bestMetrics An int array holding the best metrics found so far: 
[maxGlobalLoad,
-   *     maxDatabaseLoad, scatterValue].
-   * @param remainReplicasMap Mapping from region to its current remain 
replica set.
    */
   private void dfsRemoveNodeReplica(
       int index,
-      List<TConsensusGroupId> regionKeys,
-      Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap,
+      int currentMaxGlobalLoad,
+      int currentScatter,
       int[] currentAssignment,
-      int[] additionalLoad,
-      List<int[]> optimalAssignments,
-      int[] bestMetrics,
-      Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap,
-      Map<TConsensusGroupId, String> regionDatabaseMap) {
-    int n = regionKeys.size();
-    if (index == n) {
-      // A complete assignment has been generated.
-      // Compute metrics for this complete migration assignment.
-
-      // Compute the scatter value for the complete assignment.
-      int currentScatter = 0;
-      // For each region, calculate the scatter based on the 
combinationCounter among all nodes
-      // in the full replica set (which includes the nodes in the remain 
replica and the new
-      // candidate).
-      for (int r = 0; r < n; r++) {
-        TConsensusGroupId regionId = regionKeys.get(r);
-        for (TDataNodeLocation location : 
remainReplicasMap.get(regionId).getDataNodeLocations()) {
-          int nodeA = currentAssignment[r];
-          int nodeB = location.getDataNodeId();
-          currentScatter += combinationCounter[nodeA][nodeB];
-        }
+      int[] additionalLoad) {
+    // Compute the maximum global load and maximum database load among all 
nodes that received
+    // additional load.
+    int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter, 
currentAssignment);
+    // Lexicographically compare currentMetrics with bestMetrics.
+    // If currentMetrics is better than bestMetrics, update bestMetrics and 
clear the candidate
+    // buffer.
+    boolean isBetter = false;
+    boolean isEqual = true;
+    for (int i = 0; i < 3; i++) {
+      if (currentMetrics[i] < bestMetrics[i]) {
+        isBetter = true;
+        isEqual = false;
+        break;
+      } else if (currentMetrics[i] > bestMetrics[i]) {
+        isEqual = false;
+        break;
       }
+    }
+    if (!isBetter && !isEqual) {
+      return;
+    }
 
-      // Compute the maximum global load and maximum database load among all 
nodes that received
-      // additional load.
-      int[] currentMetrics =
-          getCurrentMetrics(
-              additionalLoad, currentScatter, regionKeys, regionDatabaseMap, 
currentAssignment);
-
-      // Lexicographically compare currentMetrics with bestMetrics.
-      // If currentMetrics is better than bestMetrics, update bestMetrics and 
clear the candidate
-      // buffer.
-      boolean isBetter = false;
-      boolean isEqual = true;
-      for (int i = 0; i < 3; i++) {
-        if (currentMetrics[i] < bestMetrics[i]) {
-          isBetter = true;
-          isEqual = false;
-          break;
-        } else if (currentMetrics[i] > bestMetrics[i]) {
-          isEqual = false;
-          break;
-        }
-      }
+    if (index == dfsRegionKeys.size()) {
       if (isBetter) {
         bestMetrics[0] = currentMetrics[0];
         bestMetrics[1] = currentMetrics[1];
         bestMetrics[2] = currentMetrics[2];
-        optimalAssignments.clear();
-        optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
-      } else if (isEqual) {
-        optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
-        // Prune search if we already have enough candidate solutions
-        if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) {
-          return;
-        }
+        System.arraycopy(currentAssignment, 0, bestAssignment, 0, 
dfsRegionKeys.size());
       }
       return;
     }
 
     // Process the region at the current index.
-    TConsensusGroupId regionId = regionKeys.get(index);
+    TConsensusGroupId regionId = dfsRegionKeys.get(index);
     List<Integer> candidates = allowedCandidatesMap.get(regionId);
     for (Integer candidate : candidates) {
       currentAssignment[index] = candidate;
+      currentScatter += scatterDelta[index][currentAssignment[index]];
       additionalLoad[candidate]++;
+      int nextMaxGlobalLoad =
+          Math.max(currentMaxGlobalLoad, regionCounter[candidate] + 
additionalLoad[candidate]);
+
       dfsRemoveNodeReplica(
-          index + 1,
-          regionKeys,
-          allowedCandidatesMap,
-          currentAssignment,
-          additionalLoad,
-          optimalAssignments,
-          bestMetrics,
-          remainReplicasMap,
-          regionDatabaseMap);
+          index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment, 
additionalLoad);
       // Backtrack
       additionalLoad[candidate]--;
+      currentScatter -= scatterDelta[index][currentAssignment[index]];
     }
   }
 
@@ -411,20 +410,12 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
    * @param additionalLoad an array representing the additional load assigned 
to each node during
    *     migration.
    * @param currentScatter the current scatter value metric.
-   * @param regionKeys a list of region identifiers (TConsensusGroupId) for 
which migration is being
-   *     computed.
-   * @param regionDatabaseMap a mapping from each region identifier to its 
corresponding database
-   *     name.
    * @param currentAssignment an array where each element is the nodeId 
assigned for the
    *     corresponding region in {@code regionKeys}.
    * @return an integer array of size 3: [maxGlobalLoad, 
databaseLoadSquaredSum, scatterValue].
    */
   private int[] getCurrentMetrics(
-      int[] additionalLoad,
-      int currentScatter,
-      List<TConsensusGroupId> regionKeys,
-      Map<TConsensusGroupId, String> regionDatabaseMap,
-      int[] currentAssignment) {
+      int[] additionalLoad, int currentScatter, int[] currentAssignment) {
     int currentMaxGlobalLoad = 0;
     // Calculate the maximum global load across all data nodes.
     for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) {
@@ -433,7 +424,7 @@ public class GreedyCopySetRegionGroupAllocator implements 
IRegionGroupAllocator
     }
     // Compute the database load squared sum using the helper method.
     int dbLoadSquaredSum =
-        computeDatabaseLoadSquaredSum(currentAssignment, regionKeys, 
regionDatabaseMap);
+        computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys, 
regionDatabaseMap);
     // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, 
scatterValue]
     return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter};
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
index 557fba8ba7a..0dd73b77f16 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java
@@ -54,7 +54,7 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
 
   private static final int TEST_DATA_NODE_NUM = 5;
 
-  private static final int DATA_REGION_PER_DATA_NODE = 4;
+  private static final int DATA_REGION_PER_DATA_NODE = 30;
 
   private static final int DATA_REPLICATION_FACTOR = 2;
 
@@ -128,6 +128,14 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
               PGPRegionCounter.put(nodeId, 0);
             });
 
+    for (TRegionReplicaSet replicaSet : allocateResult) {
+      for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+        randomRegionCounter.put(
+            loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId()) 
+ 1);
+        PGPRegionCounter.put(loc.getDataNodeId(), 
PGPRegionCounter.get(loc.getDataNodeId()) + 1);
+      }
+    }
+
     for (TRegionReplicaSet remainReplicaSet : remainReplicas) {
       TDataNodeLocation selectedNode =
           
randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get();
@@ -181,22 +189,31 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
           PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) + 
1);
     }
 
+    LOGGER.info("randomRegionCount:");
+
     for (Integer i : randomRegionCounter.keySet()) {
       Integer value = randomRegionCounter.get(i);
       randomMaxRegionCount = Math.max(randomMaxRegionCount, value);
       randomMinRegionCount = Math.min(randomMinRegionCount, value);
+      LOGGER.info("{} : {}", i, value);
     }
 
+    LOGGER.info("PGPRegionCount:");
+
     for (Integer i : PGPRegionCounter.keySet()) {
       Integer value = PGPRegionCounter.get(i);
       PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value);
       PGPMinRegionCount = Math.min(PGPMinRegionCount, value);
+      LOGGER.info("{} : {}", i, value);
     }
 
+    LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size());
     Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size());
+    LOGGER.info("randomSelectedNodeIds size: {}", 
randomSelectedNodeIds.size());
     Assert.assertTrue(PGPSelectedNodeIds.size() >= 
randomSelectedNodeIds.size());
+    LOGGER.info(
+        "randomMaxRegionCount: {}, PGPMaxRegionCount: {}", 
randomMaxRegionCount, PGPMaxRegionCount);
     Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount);
-    Assert.assertTrue(randomMinRegionCount <= PGPMinRegionCount);
   }
 
   @Test
@@ -274,6 +291,13 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
               planCount.put(n, 0);
             });
 
+    for (TRegionReplicaSet replicaSet : globalAllocatedList) {
+      for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) {
+        rndCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+        planCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+      }
+    }
+
     for (TRegionReplicaSet r : remainReplicas) {
       TDataNodeLocation pick = 
randomSelectNodeForRegion(r.getDataNodeLocations()).get();
       LOGGER.info("Random Selected DataNode {} for Region {}", 
pick.getDataNodeId(), r.regionId);
@@ -325,7 +349,6 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest {
     Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size());
     Assert.assertTrue(planNodes.size() >= rndNodes.size());
     Assert.assertTrue(rndMax >= planMax);
-    Assert.assertTrue(rndMin <= planMin);
   }
 
   private Optional<TDataNodeLocation> randomSelectNodeForRegion(

Reply via email to