This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38b7ae2eb1 Enhance rebalancer to also log segments removed (#14180)
38b7ae2eb1 is described below

commit 38b7ae2eb1ca8edb0b33269afd015229a6a3addc
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Oct 8 16:11:57 2024 -0700

    Enhance rebalancer to also log segments removed (#14180)
---
 .../segment/OfflineSegmentAssignment.java          |  30 ++--
 .../segment/RealtimeSegmentAssignment.java         |  23 ++-
 .../assignment/segment/SegmentAssignmentUtils.java |  56 +++++--
 .../helix/core/rebalance/TableRebalancer.java      |   6 +-
 .../segment/SegmentAssignmentUtilsTest.java        | 172 +++++++++++----------
 .../TableRebalancerClusterStatelessTest.java       |  23 ++-
 6 files changed, 178 insertions(+), 132 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index d589d279e6..5e407564f7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -45,13 +45,14 @@ public class OfflineSegmentAssignment extends 
BaseSegmentAssignment {
     Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
         _tableNameWithType);
     // Gets Segment assignment strategy for instance partitions
-    SegmentAssignmentStrategy segmentAssignmentStrategy = 
SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(_helixManager, _tableConfig, 
InstancePartitionsType.OFFLINE.toString(),
-            instancePartitions);
+    SegmentAssignmentStrategy segmentAssignmentStrategy =
+        
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, 
_tableConfig,
+            InstancePartitionsType.OFFLINE.toString(), instancePartitions);
     _logger.info("Assigning segment: {} with instance partitions: {} for 
table: {}", segmentName, instancePartitions,
         _tableNameWithType);
-    List<String> instancesAssigned = segmentAssignmentStrategy
-        .assignSegment(segmentName, currentAssignment, instancePartitions, 
InstancePartitionsType.OFFLINE);
+    List<String> instancesAssigned =
+        segmentAssignmentStrategy.assignSegment(segmentName, 
currentAssignment, instancePartitions,
+            InstancePartitionsType.OFFLINE);
     _logger.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
         _tableNameWithType);
     return instancesAssigned;
@@ -62,20 +63,19 @@ public class OfflineSegmentAssignment extends 
BaseSegmentAssignment {
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
       @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
     InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions
-        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
-            _tableNameWithType);
+    Preconditions.checkState(offlineInstancePartitions != null,
+        "Failed to find OFFLINE instance partitions for table: %s", 
_tableNameWithType);
     // Gets Segment assignment strategy for instance partitions
-    SegmentAssignmentStrategy segmentAssignmentStrategy = 
SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(_helixManager, _tableConfig, 
InstancePartitionsType.OFFLINE.toString(),
-            offlineInstancePartitions);
+    SegmentAssignmentStrategy segmentAssignmentStrategy =
+        
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, 
_tableConfig,
+            InstancePartitionsType.OFFLINE.toString(), 
offlineInstancePartitions);
     // TODO: Right now as per tier assignment, different instances will be 
picked up for different tiers which
     // would produce incorrect results for Dim tables. In future, we need some 
preconditions to check if
     // tierPartitionMap has single tier for Dim tables and remove below check
     // See https://github.com/apache/pinot/issues/9047
     if (segmentAssignmentStrategy instanceof 
AllServersSegmentAssignmentStrategy) {
-      return segmentAssignmentStrategy
-          .reassignSegments(currentAssignment, offlineInstancePartitions, 
InstancePartitionsType.OFFLINE);
+      return segmentAssignmentStrategy.reassignSegments(currentAssignment, 
offlineInstancePartitions,
+          InstancePartitionsType.OFFLINE);
     }
     boolean bootstrap = config.isBootstrap();
     // Rebalance tiers first
@@ -96,8 +96,8 @@ public class OfflineSegmentAssignment extends 
BaseSegmentAssignment {
       newTierAssignments.forEach(newAssignment::putAll);
     }
 
-    _logger.info("Rebalanced table: {}, number of segments to be moved to each 
instance: {}", _tableNameWithType,
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
+    _logger.info("Rebalanced table: {}, number of segments to be added/removed 
for each instance: {}",
+        _tableNameWithType, 
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment));
     return newAssignment;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index e114bf6e1d..492afed1a7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -88,11 +88,11 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
     List<String> instancesAssigned;
     if (instancePartitionsType == InstancePartitionsType.COMPLETED) {
       // Gets Segment assignment strategy for instance partitions
-      SegmentAssignmentStrategy segmentAssignmentStrategy = 
SegmentAssignmentStrategyFactory
-          .getSegmentAssignmentStrategy(_helixManager, _tableConfig, 
instancePartitionsType.toString(),
-              instancePartitions);
-      instancesAssigned = segmentAssignmentStrategy
-          .assignSegment(segmentName, currentAssignment, instancePartitions, 
InstancePartitionsType.COMPLETED);
+      SegmentAssignmentStrategy segmentAssignmentStrategy =
+          
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, 
_tableConfig,
+              instancePartitionsType.toString(), instancePartitions);
+      instancesAssigned = segmentAssignmentStrategy.assignSegment(segmentName, 
currentAssignment, instancePartitions,
+          InstancePartitionsType.COMPLETED);
     } else {
       instancesAssigned = assignConsumingSegment(segmentName, 
instancePartitions);
     }
@@ -176,9 +176,8 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
       @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
     InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
     InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions
-        .checkState(consumingInstancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
-            _tableNameWithType);
+    Preconditions.checkState(consumingInstancePartitions != null,
+        "Failed to find CONSUMING instance partitions for table: %s", 
_tableNameWithType);
     boolean includeConsuming = config.isIncludeConsuming();
     boolean bootstrap = config.isBootstrap();
     // Rebalance tiers first
@@ -231,8 +230,8 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
     Map<String, Map<String, String>> consumingSegmentAssignment =
         
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (includeConsuming) {
-      _logger
-          .info("Reassigning CONSUMING segments with CONSUMING instance 
partitions for table: {}", _tableNameWithType);
+      _logger.info("Reassigning CONSUMING segments with CONSUMING instance 
partitions for table: {}",
+          _tableNameWithType);
 
       for (String segmentName : consumingSegmentAssignment.keySet()) {
         List<String> instancesAssigned = assignConsumingSegment(segmentName, 
consumingInstancePartitions);
@@ -253,8 +252,8 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
       newTierAssignments.forEach(newAssignment::putAll);
     }
 
-    _logger.info("Rebalanced table: {}, number of segments to be moved to each 
instance: {}", _tableNameWithType,
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
+    _logger.info("Rebalanced table: {}, number of segments to be added/removed 
for each instance: {}",
+        _tableNameWithType, 
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment));
     return newAssignment;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 52f736a555..727bdbdfda 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntIntMutablePair;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -81,10 +83,10 @@ public class SegmentAssignmentUtils {
    */
   public static List<String> 
getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions 
instancePartitions,
       int replication) {
-    Preconditions
-        .checkState(instancePartitions.getNumReplicaGroups() == 1 && 
instancePartitions.getNumPartitions() == 1,
-            "Instance partitions: %s should contain 1 replica and 1 partition 
for non-replica-group based assignment",
-            instancePartitions.getInstancePartitionsName());
+    Preconditions.checkState(
+        instancePartitions.getNumReplicaGroups() == 1 && 
instancePartitions.getNumPartitions() == 1,
+        "Instance partitions: %s should contain 1 replica and 1 partition for 
non-replica-group based assignment",
+        instancePartitions.getInstancePartitionsName());
     List<String> instances = instancePartitions.getInstances(0, 0);
     int numInstances = instances.size();
     Preconditions.checkState(numInstances >= replication,
@@ -134,8 +136,8 @@ public class SegmentAssignmentUtils {
     int numReplicaGroups = instancePartitions.getNumReplicaGroups();
     List<String> instancesAssigned = new ArrayList<>(numReplicaGroups);
     for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-      instancesAssigned
-          .add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
+      instancesAssigned.add(
+          instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
     }
     return instancesAssigned;
   }
@@ -219,8 +221,8 @@ public class SegmentAssignmentUtils {
       for (String instanceName : currentAssignment.get(segmentName).keySet()) {
         Integer instanceId = instanceNameToIdMap.get(instanceName);
         if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < 
targetNumSegmentsPerInstance) {
-          newAssignment
-              .put(segmentName, 
getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId, 
instanceId));
+          newAssignment.put(segmentName,
+              getReplicaGroupBasedInstanceStateMap(instancePartitions, 
partitionId, instanceId));
           numSegmentsAssignedPerInstance[instanceId]++;
           segmentAssigned = true;
           break;
@@ -255,8 +257,8 @@ public class SegmentAssignmentUtils {
     Map<String, String> instanceStateMap = new TreeMap<>();
     int numReplicaGroups = instancePartitions.getNumReplicaGroups();
     for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-      instanceStateMap
-          .put(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceId), SegmentStateModel.ONLINE);
+      instanceStateMap.put(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceId),
+          SegmentStateModel.ONLINE);
     }
     return instanceStateMap;
   }
@@ -275,6 +277,7 @@ public class SegmentAssignmentUtils {
   /**
    * Returns a map from instance name to number of segments to be moved to it.
    */
+  @Deprecated
   public static Map<String, Integer> 
getNumSegmentsToBeMovedPerInstance(Map<String, Map<String, String>> 
oldAssignment,
       Map<String, Map<String, String>> newAssignment) {
     Map<String, Integer> numSegmentsToBeMovedPerInstance = new TreeMap<>();
@@ -292,6 +295,34 @@ public class SegmentAssignmentUtils {
     return numSegmentsToBeMovedPerInstance;
   }
 
+  /**
+   * Returns a map from instance name to number of segments to be 
added/removed.
+   */
+  public static Map<String, IntIntPair> 
getNumSegmentsToMovePerInstance(Map<String, Map<String, String>> oldAssignment,
+      Map<String, Map<String, String>> newAssignment) {
+    Map<String, IntIntPair> numSegmentsToMovePerInstance = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
newAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Set<String> newInstancesAssigned = entry.getValue().keySet();
+      Set<String> oldInstancesAssigned = 
oldAssignment.get(segmentName).keySet();
+      // For each new assigned instance, check if the segment needs to be 
added to it
+      for (String instanceName : newInstancesAssigned) {
+        if (!oldInstancesAssigned.contains(instanceName)) {
+          numSegmentsToMovePerInstance.compute(instanceName,
+              (k, v) -> v == null ? new IntIntMutablePair(1, 0) : 
v.left(v.leftInt() + 1));
+        }
+      }
+      // For each old assigned instance, check if the segment needs to be 
removed from it
+      for (String instanceName : oldInstancesAssigned) {
+        if (!newInstancesAssigned.contains(instanceName)) {
+          numSegmentsToMovePerInstance.compute(instanceName,
+              (k, v) -> v == null ? new IntIntMutablePair(0, 1) : 
v.right(v.rightInt() + 1));
+        }
+      }
+    }
+    return numSegmentsToMovePerInstance;
+  }
+
   public static List<String> getSegmentsToMove(Map<String, Map<String, 
String>> oldAssignment,
       Map<String, Map<String, String>> newAssignment) {
     List<String> segmentsToMove = new ArrayList<>();
@@ -415,9 +446,8 @@ public class SegmentAssignmentUtils {
       @Nullable String partitionColumn) {
     SegmentZKMetadata segmentZKMetadata =
         
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
offlineTableName, segmentName);
-    Preconditions
-        .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
-            segmentName, offlineTableName);
+    Preconditions.checkState(segmentZKMetadata != null,
+        "Failed to find segment ZK metadata for segment: %s of table: %s", 
segmentName, offlineTableName);
     return getPartitionId(segmentZKMetadata, offlineTableName, 
partitionColumn);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 293ada1da5..395edf0827 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -499,9 +499,9 @@ public class TableRebalancer {
       Map<String, Map<String, String>> nextAssignment =
           getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
               lowDiskMode);
-      LOGGER.info("For rebalanceId: {}, got the next assignment for table: {} 
with number of segments to be moved to "
-              + "each instance: {}", rebalanceJobId, tableNameWithType,
-          
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
nextAssignment));
+      LOGGER.info("For rebalanceId: {}, got the next assignment for table: {} 
with number of segments to be "
+              + "added/removed for each instance: {}", rebalanceJobId, 
tableNameWithType,
+          
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
nextAssignment));
 
       // Reuse current IdealState to update the IdealState in cluster
       idealStateRecord.setMapFields(nextAssignment);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index e9301c1f7c..0f43b7869d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
+import it.unimi.dsi.fastutil.ints.IntIntPair;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,13 +45,10 @@ public class SegmentAssignmentUtilsTest {
     int numInstances = 10;
     List<String> instances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances);
 
-    // Uniformly spray segments to the instances:
-    // [instance_0,   instance_1,   instance_2,   instance_3,   instance_4,   
instance_5,   instance_6,   instance_7,
-    // instance_8,   instance_9]
-    //  segment_0(r0) segment_0(r1) segment_0(r2) segment_1(r0) segment_1(r1) 
segment_1(r2) segment_2(r0) segment_2
-    //  (r1) segment_2(r2) segment_3(r0)
-    //  segment_3(r1) segment_3(r2) segment_4(r0) segment_4(r1) segment_4(r2) 
segment_5(r0) segment_5(r1) segment_5
-    //  (r2) segment_6(r0) segment_6(r1)
+    // Uniformly spray segments to the instances (i0 represents instance_0, s0 
represents segment_0)
+    // [    i0,    i1,    i2,    i3,    i4,    i5,    i6,    i7,    i8,    i9]
+    //  s0(r0) s0(r1) s0(r2) s1(r0) s1(r1) s1(r2) s2(r0) s2(r1) s2(r2) s3(r0)
+    //  s3(r1) s3(r2) s4(r0) s4(r1) s4(r2) s5(r0) s5(r1) s5(r2) s6(r0) s6(r1)
     //  ...
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     int assignedInstanceId = 0;
@@ -60,8 +58,8 @@ public class SegmentAssignmentUtilsTest {
         instancesAssigned.add(instances.get(assignedInstanceId));
         assignedInstanceId = (assignedInstanceId + 1) % numInstances;
       }
-      currentAssignment
-          .put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
+      currentAssignment.put(segmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
     }
 
     // There should be 100 segments assigned
@@ -83,15 +81,12 @@ public class SegmentAssignmentUtilsTest {
         currentAssignment);
 
     // Replace instance_0 with instance_10
-    // {
-    //   0_0=[instance_10, instance_1, instance_2, instance_3, instance_4, 
instance_5, instance_6, instance_7,
-    //   instance_8, instance_9]
-    // }
     List<String> newInstances = new ArrayList<>(instances);
     String newInstanceName = INSTANCE_NAME_PREFIX + 10;
     newInstances.set(0, newInstanceName);
-    Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-        .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
newInstances, NUM_REPLICAS);
+    Map<String, Map<String, String>> newAssignment =
+        
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
+            NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(currentAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -103,11 +98,12 @@ public class SegmentAssignmentUtilsTest {
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
newInstances);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // All segments on instance_0 should be moved to instance_10
-    Map<String, Integer> numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), 1);
-    assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstanceName), 
numSegmentsPerInstance);
+    Map<String, IntIntPair> numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), 2);
+    assertEquals(numSegmentsToMovePerInstance.get(newInstanceName), 
IntIntPair.of(30, 0));
     String oldInstanceName = INSTANCE_NAME_PREFIX + 0;
+    assertEquals(numSegmentsToMovePerInstance.get(oldInstanceName), 
IntIntPair.of(0, 30));
     for (String segmentName : segments) {
       if (currentAssignment.get(segmentName).containsKey(oldInstanceName)) {
         
assertTrue(newAssignment.get(segmentName).containsKey(newInstanceName));
@@ -120,8 +116,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     int newNumInstances = numInstances - 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
newInstances, NUM_REPLICAS);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
+        NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -136,26 +132,26 @@ public class SegmentAssignmentUtilsTest {
     assertEquals(numSegmentsAssignedPerInstance[2], 60);
     assertEquals(numSegmentsAssignedPerInstance[3], 60);
     assertEquals(numSegmentsAssignedPerInstance[4], 64);
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(0)), 26);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(1)), 30);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(2)), 30);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(3)), 30);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(4)), 34);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), 
IntIntPair.of(26, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1), 
IntIntPair.of(30, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2), 
IntIntPair.of(30, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3), 
IntIntPair.of(30, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), 
IntIntPair.of(34, 0));
+    for (int i = 5; i < 10; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 30));
+    }
 
-    // Add 5 instances
+    // Add 5 instances (i0 represents instance_0)
     // {
-    //   0_0=[
-    //       instance_0, instance_1, instance_2, instance_3, instance_4, 
instance_5, instance_6, instance_7,
-    //       instance_8, instance_9,
-    //       instance_10, instance_11, instance_12, instance_13, instance_14]
+    //   0_0=[i0, i1, i2, i3, i4, i5, i6, i7, i8, i9, i10, i11, i12, i13, i14]
     // }
     newNumInstances = numInstances + 5;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
newInstances, NUM_REPLICAS);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
+        NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -170,11 +166,14 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
newNumSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Each new added instance should have 20 segments to be moved to it
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), 5);
-    for (int instanceId = numInstances; instanceId < newNumInstances; 
instanceId++) {
-      assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)), 
newNumSegmentsPerInstance);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), newNumInstances);
+    for (int i = 0; i < numInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 10));
+    }
+    for (int i = numInstances; i < newNumInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(20, 0));
     }
 
     // Change all instances
@@ -183,8 +182,8 @@ public class SegmentAssignmentUtilsTest {
     // }
     String newInstanceNamePrefix = "i_";
     newInstances = 
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
newInstances, NUM_REPLICAS);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
 newInstances,
+        NUM_REPLICAS);
     // There should be 100 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -198,11 +197,12 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Each instance should have 30 segments to be moved to it
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances);
-    for (String instanceName : newInstances) {
-      assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName), 
numSegmentsPerInstance);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), 2 * numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(newInstanceNamePrefix + 
i), IntIntPair.of(30, 0));
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 30));
     }
   }
 
@@ -264,9 +264,8 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(SegmentAssignmentUtils
-            .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions, partitionIdToSegmentsMap),
-        currentAssignment);
+    
assertEquals(SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
 instancePartitions,
+        partitionIdToSegmentsMap), currentAssignment);
 
     // Replace instance_0 with instance_9, instance_4 with instance_10
     // {
@@ -289,8 +288,9 @@ public class SegmentAssignmentUtilsTest {
     newInstancePartitions.setInstances(0, 0, newReplicaGroup0Instances);
     newInstancePartitions.setInstances(0, 1, newReplicaGroup1Instances);
     newInstancePartitions.setInstances(0, 2, newReplicaGroup2Instances);
-    Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
+    Map<String, Map<String, String>> newAssignment =
+        
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions,
+            partitionIdToSegmentsMap);
     // There should be 90 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -303,13 +303,15 @@ public class SegmentAssignmentUtilsTest {
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // All segments on instance_0 should be moved to instance_9, all segments 
on instance_4 should be moved to
     // instance_10
-    Map<String, Integer> numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), 2);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplicaGroup0Instance), 
numSegmentsPerInstance);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplicaGroup1Instance), 
numSegmentsPerInstance);
+    Map<String, IntIntPair> numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), 4);
+    assertEquals(numSegmentsToMovePerInstance.get(newReplicaGroup0Instance), 
IntIntPair.of(30, 0));
+    assertEquals(numSegmentsToMovePerInstance.get(newReplicaGroup1Instance), 
IntIntPair.of(30, 0));
     String oldReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 0;
     String oldReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 4;
+    assertEquals(numSegmentsToMovePerInstance.get(oldReplicaGroup0Instance), 
IntIntPair.of(0, 30));
+    assertEquals(numSegmentsToMovePerInstance.get(oldReplicaGroup0Instance), 
IntIntPair.of(0, 30));
     for (String segmentName : segments) {
       Map<String, String> oldInstanceStateMap = 
currentAssignment.get(segmentName);
       if (oldInstanceStateMap.containsKey(oldReplicaGroup0Instance)) {
@@ -335,8 +337,8 @@ public class SegmentAssignmentUtilsTest {
       newInstancePartitions.setInstances(0, replicaGroupId, 
newInstancesForReplicaGroup);
       newInstances.addAll(newInstancesForReplicaGroup);
     }
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions,
+        partitionIdToSegmentsMap);
     // There should be 90 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -351,12 +353,15 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
newNumSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Each instance should have 15 segments to be moved to it
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances);
-    for (String instanceName : newInstances) {
-      assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName),
-          newNumSegmentsPerInstance - numSegmentsPerInstance);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
+    for (String instanceName : instances) {
+      if (newInstances.contains(instanceName)) {
+        assertEquals(numSegmentsToMovePerInstance.get(instanceName), 
IntIntPair.of(15, 0));
+      } else {
+        assertEquals(numSegmentsToMovePerInstance.get(instanceName), 
IntIntPair.of(0, 30));
+      }
     }
 
     // Add 6 instances (2 to each replica-group)
@@ -376,8 +381,8 @@ public class SegmentAssignmentUtilsTest {
       }
       newInstancePartitions.setInstances(0, replicaGroupId, 
newInstancesForReplicaGroup);
     }
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions,
+        partitionIdToSegmentsMap);
     // There should be 90 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -392,11 +397,14 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
newNumSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Each new added instance should have 18 segments to be moved to it
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), 6);
-    for (int instanceId = numInstances; instanceId < newNumInstances; 
instanceId++) {
-      assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)), 
newNumSegmentsPerInstance);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), newNumInstances);
+    for (int i = 0; i < numInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 12));
+    }
+    for (int i = numInstances; i < newNumInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(18, 0));
     }
 
     // Change all instances
@@ -405,7 +413,8 @@ public class SegmentAssignmentUtilsTest {
     //   0_1=[i_3, i_4, i_5],
     //   0_2=[i_6, i_7, i_8]
     // }
-    newInstances = SegmentAssignmentTestUtils.getNameList("i_", numInstances);
+    String newInstanceNamePrefix = "i_";
+    newInstances = 
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
     instanceIdToAdd = 0;
     for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
       List<String> instancesForReplicaGroup = new 
ArrayList<>(numInstancesPerReplicaGroup);
@@ -414,8 +423,8 @@ public class SegmentAssignmentUtilsTest {
       }
       newInstancePartitions.setInstances(0, replicaGroupId, 
instancesForReplicaGroup);
     }
-    newAssignment = SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
+    newAssignment = 
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions,
+        partitionIdToSegmentsMap);
     // There should be 90 segments assigned
     assertEquals(newAssignment.size(), numSegments);
     // Each segment should have 3 replicas
@@ -429,11 +438,12 @@ public class SegmentAssignmentUtilsTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Each instance should have 30 segments to be moved to it
-    numSegmentsToBeMovedPerInstance =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
-    assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances);
-    for (String instanceName : newInstances) {
-      assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName), 
numSegmentsPerInstance);
+    numSegmentsToMovePerInstance =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment);
+    assertEquals(numSegmentsToMovePerInstance.size(), 2 * numInstances);
+    for (int i = 0; i < numInstances; i++) {
+      assertEquals(numSegmentsToMovePerInstance.get(newInstanceNamePrefix + 
i), IntIntPair.of(30, 0));
+      assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), 
IntIntPair.of(0, 30));
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 1df7109ef2..c5ac9e82bd 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.rebalance;
 
 import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,10 +48,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 @Test(groups = "stateless")
@@ -155,11 +153,20 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
 
     // Segments should be moved to the new added servers
     Map<String, Map<String, String>> newSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
-    Map<String, Integer> instanceToNumSegmentsToBeMovedMap =
-        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(oldSegmentAssignment, 
newSegmentAssignment);
-    assertEquals(instanceToNumSegmentsToBeMovedMap.size(), numServersToAdd);
+    Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
+        
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, 
newSegmentAssignment);
+    assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + 
numServersToAdd);
     for (int i = 0; i < numServersToAdd; i++) {
-      
assertTrue(instanceToNumSegmentsToBeMovedMap.containsKey(SERVER_INSTANCE_ID_PREFIX
 + (numServers + i)));
+      IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + 
i));
+      assertNotNull(numSegmentsToMove);
+      assertTrue(numSegmentsToMove.leftInt() > 0);
+      assertEquals(numSegmentsToMove.rightInt(), 0);
+    }
+    for (int i = 0; i < numServers; i++) {
+      IntIntPair numSegmentsToMove = 
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
+      assertNotNull(numSegmentsToMove);
+      assertEquals(numSegmentsToMove.leftInt(), 0);
+      assertTrue(numSegmentsToMove.rightInt() > 0);
     }
 
     // Dry-run mode should not change the IdealState


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to