This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38b7ae2eb1 Enhance rebalancer to also log segments removed (#14180)
38b7ae2eb1 is described below
commit 38b7ae2eb1ca8edb0b33269afd015229a6a3addc
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Oct 8 16:11:57 2024 -0700
Enhance rebalancer to also log segments removed (#14180)
---
.../segment/OfflineSegmentAssignment.java | 30 ++--
.../segment/RealtimeSegmentAssignment.java | 23 ++-
.../assignment/segment/SegmentAssignmentUtils.java | 56 +++++--
.../helix/core/rebalance/TableRebalancer.java | 6 +-
.../segment/SegmentAssignmentUtilsTest.java | 172 +++++++++++----------
.../TableRebalancerClusterStatelessTest.java | 23 ++-
6 files changed, 178 insertions(+), 132 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index d589d279e6..5e407564f7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -45,13 +45,14 @@ public class OfflineSegmentAssignment extends
BaseSegmentAssignment {
Preconditions.checkState(instancePartitions != null, "Failed to find
OFFLINE instance partitions for table: %s",
_tableNameWithType);
// Gets Segment assignment strategy for instance partitions
- SegmentAssignmentStrategy segmentAssignmentStrategy =
SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(_helixManager, _tableConfig,
InstancePartitionsType.OFFLINE.toString(),
- instancePartitions);
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager,
_tableConfig,
+ InstancePartitionsType.OFFLINE.toString(), instancePartitions);
_logger.info("Assigning segment: {} with instance partitions: {} for
table: {}", segmentName, instancePartitions,
_tableNameWithType);
- List<String> instancesAssigned = segmentAssignmentStrategy
- .assignSegment(segmentName, currentAssignment, instancePartitions,
InstancePartitionsType.OFFLINE);
+ List<String> instancesAssigned =
+ segmentAssignmentStrategy.assignSegment(segmentName,
currentAssignment, instancePartitions,
+ InstancePartitionsType.OFFLINE);
_logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
_tableNameWithType);
return instancesAssigned;
@@ -62,20 +63,19 @@ public class OfflineSegmentAssignment extends
BaseSegmentAssignment {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
@Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
InstancePartitions offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
- Preconditions
- .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE
instance partitions for table: %s",
- _tableNameWithType);
+ Preconditions.checkState(offlineInstancePartitions != null,
+ "Failed to find OFFLINE instance partitions for table: %s",
_tableNameWithType);
// Gets Segment assignment strategy for instance partitions
- SegmentAssignmentStrategy segmentAssignmentStrategy =
SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(_helixManager, _tableConfig,
InstancePartitionsType.OFFLINE.toString(),
- offlineInstancePartitions);
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager,
_tableConfig,
+ InstancePartitionsType.OFFLINE.toString(),
offlineInstancePartitions);
// TODO: Right now as per tier assignment, different instances will be
picked up for different tiers which
// would produce incorrect results for Dim tables. In future, we need some
preconditions to check if
// tierPartitionMap has single tier for Dim tables and remove below check
// See https://github.com/apache/pinot/issues/9047
if (segmentAssignmentStrategy instanceof
AllServersSegmentAssignmentStrategy) {
- return segmentAssignmentStrategy
- .reassignSegments(currentAssignment, offlineInstancePartitions,
InstancePartitionsType.OFFLINE);
+ return segmentAssignmentStrategy.reassignSegments(currentAssignment,
offlineInstancePartitions,
+ InstancePartitionsType.OFFLINE);
}
boolean bootstrap = config.isBootstrap();
// Rebalance tiers first
@@ -96,8 +96,8 @@ public class OfflineSegmentAssignment extends
BaseSegmentAssignment {
newTierAssignments.forEach(newAssignment::putAll);
}
- _logger.info("Rebalanced table: {}, number of segments to be moved to each
instance: {}", _tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
+ _logger.info("Rebalanced table: {}, number of segments to be added/removed
for each instance: {}",
+ _tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
return newAssignment;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index e114bf6e1d..492afed1a7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -88,11 +88,11 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
List<String> instancesAssigned;
if (instancePartitionsType == InstancePartitionsType.COMPLETED) {
// Gets Segment assignment strategy for instance partitions
- SegmentAssignmentStrategy segmentAssignmentStrategy =
SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(_helixManager, _tableConfig,
instancePartitionsType.toString(),
- instancePartitions);
- instancesAssigned = segmentAssignmentStrategy
- .assignSegment(segmentName, currentAssignment, instancePartitions,
InstancePartitionsType.COMPLETED);
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+
SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager,
_tableConfig,
+ instancePartitionsType.toString(), instancePartitions);
+ instancesAssigned = segmentAssignmentStrategy.assignSegment(segmentName,
currentAssignment, instancePartitions,
+ InstancePartitionsType.COMPLETED);
} else {
instancesAssigned = assignConsumingSegment(segmentName,
instancePartitions);
}
@@ -176,9 +176,8 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
@Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions
- .checkState(consumingInstancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
- _tableNameWithType);
+ Preconditions.checkState(consumingInstancePartitions != null,
+ "Failed to find CONSUMING instance partitions for table: %s",
_tableNameWithType);
boolean includeConsuming = config.isIncludeConsuming();
boolean bootstrap = config.isBootstrap();
// Rebalance tiers first
@@ -231,8 +230,8 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
Map<String, Map<String, String>> consumingSegmentAssignment =
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
if (includeConsuming) {
- _logger
- .info("Reassigning CONSUMING segments with CONSUMING instance
partitions for table: {}", _tableNameWithType);
+ _logger.info("Reassigning CONSUMING segments with CONSUMING instance
partitions for table: {}",
+ _tableNameWithType);
for (String segmentName : consumingSegmentAssignment.keySet()) {
List<String> instancesAssigned = assignConsumingSegment(segmentName,
consumingInstancePartitions);
@@ -253,8 +252,8 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
newTierAssignments.forEach(newAssignment::putAll);
}
- _logger.info("Rebalanced table: {}, number of segments to be moved to each
instance: {}", _tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment));
+ _logger.info("Rebalanced table: {}, number of segments to be added/removed
for each instance: {}",
+ _tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
return newAssignment;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 52f736a555..727bdbdfda 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -19,6 +19,8 @@
package org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntIntMutablePair;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -81,10 +83,10 @@ public class SegmentAssignmentUtils {
*/
public static List<String>
getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions
instancePartitions,
int replication) {
- Preconditions
- .checkState(instancePartitions.getNumReplicaGroups() == 1 &&
instancePartitions.getNumPartitions() == 1,
- "Instance partitions: %s should contain 1 replica and 1 partition
for non-replica-group based assignment",
- instancePartitions.getInstancePartitionsName());
+ Preconditions.checkState(
+ instancePartitions.getNumReplicaGroups() == 1 &&
instancePartitions.getNumPartitions() == 1,
+ "Instance partitions: %s should contain 1 replica and 1 partition for
non-replica-group based assignment",
+ instancePartitions.getInstancePartitionsName());
List<String> instances = instancePartitions.getInstances(0, 0);
int numInstances = instances.size();
Preconditions.checkState(numInstances >= replication,
@@ -134,8 +136,8 @@ public class SegmentAssignmentUtils {
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
List<String> instancesAssigned = new ArrayList<>(numReplicaGroups);
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- instancesAssigned
- .add(instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
+ instancesAssigned.add(
+ instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
}
return instancesAssigned;
}
@@ -219,8 +221,8 @@ public class SegmentAssignmentUtils {
for (String instanceName : currentAssignment.get(segmentName).keySet()) {
Integer instanceId = instanceNameToIdMap.get(instanceName);
if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] <
targetNumSegmentsPerInstance) {
- newAssignment
- .put(segmentName,
getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId,
instanceId));
+ newAssignment.put(segmentName,
+ getReplicaGroupBasedInstanceStateMap(instancePartitions,
partitionId, instanceId));
numSegmentsAssignedPerInstance[instanceId]++;
segmentAssigned = true;
break;
@@ -255,8 +257,8 @@ public class SegmentAssignmentUtils {
Map<String, String> instanceStateMap = new TreeMap<>();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- instanceStateMap
- .put(instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceId), SegmentStateModel.ONLINE);
+ instanceStateMap.put(instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceId),
+ SegmentStateModel.ONLINE);
}
return instanceStateMap;
}
@@ -275,6 +277,7 @@ public class SegmentAssignmentUtils {
/**
* Returns a map from instance name to number of segments to be moved to it.
*/
+ @Deprecated
public static Map<String, Integer>
getNumSegmentsToBeMovedPerInstance(Map<String, Map<String, String>>
oldAssignment,
Map<String, Map<String, String>> newAssignment) {
Map<String, Integer> numSegmentsToBeMovedPerInstance = new TreeMap<>();
@@ -292,6 +295,34 @@ public class SegmentAssignmentUtils {
return numSegmentsToBeMovedPerInstance;
}
+ /**
+ * Returns a map from instance name to number of segments to be
added/removed.
+ */
+ public static Map<String, IntIntPair>
getNumSegmentsToMovePerInstance(Map<String, Map<String, String>> oldAssignment,
+ Map<String, Map<String, String>> newAssignment) {
+ Map<String, IntIntPair> numSegmentsToMovePerInstance = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
newAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Set<String> newInstancesAssigned = entry.getValue().keySet();
+ Set<String> oldInstancesAssigned =
oldAssignment.get(segmentName).keySet();
+ // For each new assigned instance, check if the segment needs to be
added to it
+ for (String instanceName : newInstancesAssigned) {
+ if (!oldInstancesAssigned.contains(instanceName)) {
+ numSegmentsToMovePerInstance.compute(instanceName,
+ (k, v) -> v == null ? new IntIntMutablePair(1, 0) :
v.left(v.leftInt() + 1));
+ }
+ }
+ // For each old assigned instance, check if the segment needs to be
removed from it
+ for (String instanceName : oldInstancesAssigned) {
+ if (!newInstancesAssigned.contains(instanceName)) {
+ numSegmentsToMovePerInstance.compute(instanceName,
+ (k, v) -> v == null ? new IntIntMutablePair(0, 1) :
v.right(v.rightInt() + 1));
+ }
+ }
+ }
+ return numSegmentsToMovePerInstance;
+ }
+
public static List<String> getSegmentsToMove(Map<String, Map<String,
String>> oldAssignment,
Map<String, Map<String, String>> newAssignment) {
List<String> segmentsToMove = new ArrayList<>();
@@ -415,9 +446,8 @@ public class SegmentAssignmentUtils {
@Nullable String partitionColumn) {
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(),
offlineTableName, segmentName);
- Preconditions
- .checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
- segmentName, offlineTableName);
+ Preconditions.checkState(segmentZKMetadata != null,
+ "Failed to find segment ZK metadata for segment: %s of table: %s",
segmentName, offlineTableName);
return getPartitionId(segmentZKMetadata, offlineTableName,
partitionColumn);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 293ada1da5..395edf0827 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -499,9 +499,9 @@ public class TableRebalancer {
Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
lowDiskMode);
- LOGGER.info("For rebalanceId: {}, got the next assignment for table: {}
with number of segments to be moved to "
- + "each instance: {}", rebalanceJobId, tableNameWithType,
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
nextAssignment));
+ LOGGER.info("For rebalanceId: {}, got the next assignment for table: {}
with number of segments to be "
+ + "added/removed for each instance: {}", rebalanceJobId,
tableNameWithType,
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
nextAssignment));
// Reuse current IdealState to update the IdealState in cluster
idealStateRecord.setMapFields(nextAssignment);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index e9301c1f7c..0f43b7869d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.assignment.segment;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -44,13 +45,10 @@ public class SegmentAssignmentUtilsTest {
int numInstances = 10;
List<String> instances =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances);
- // Uniformly spray segments to the instances:
- // [instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7,
- // instance_8, instance_9]
- // segment_0(r0) segment_0(r1) segment_0(r2) segment_1(r0) segment_1(r1)
segment_1(r2) segment_2(r0) segment_2
- // (r1) segment_2(r2) segment_3(r0)
- // segment_3(r1) segment_3(r2) segment_4(r0) segment_4(r1) segment_4(r2)
segment_5(r0) segment_5(r1) segment_5
- // (r2) segment_6(r0) segment_6(r1)
+ // Uniformly spray segments to the instances (i0 represents instance_0, s0
represents segment_0)
+ // [ i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
+ // s0(r0) s0(r1) s0(r2) s1(r0) s1(r1) s1(r2) s2(r0) s2(r1) s2(r2) s3(r0)
+ // s3(r1) s3(r2) s4(r0) s4(r1) s4(r2) s5(r0) s5(r1) s5(r2) s6(r0) s6(r1)
// ...
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
int assignedInstanceId = 0;
@@ -60,8 +58,8 @@ public class SegmentAssignmentUtilsTest {
instancesAssigned.add(instances.get(assignedInstanceId));
assignedInstanceId = (assignedInstanceId + 1) % numInstances;
}
- currentAssignment
- .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
+ currentAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
@@ -83,15 +81,12 @@ public class SegmentAssignmentUtilsTest {
currentAssignment);
// Replace instance_0 with instance_10
- // {
- // 0_0=[instance_10, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7,
- // instance_8, instance_9]
- // }
List<String> newInstances = new ArrayList<>(instances);
String newInstanceName = INSTANCE_NAME_PREFIX + 10;
newInstances.set(0, newInstanceName);
- Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
- .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances, NUM_REPLICAS);
+ Map<String, Map<String, String>> newAssignment =
+
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
+ NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(currentAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -103,11 +98,12 @@ public class SegmentAssignmentUtilsTest {
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment,
newInstances);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// All segments on instance_0 should be moved to instance_10
- Map<String, Integer> numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), 1);
- assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstanceName),
numSegmentsPerInstance);
+ Map<String, IntIntPair> numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), 2);
+ assertEquals(numSegmentsToMovePerInstance.get(newInstanceName),
IntIntPair.of(30, 0));
String oldInstanceName = INSTANCE_NAME_PREFIX + 0;
+ assertEquals(numSegmentsToMovePerInstance.get(oldInstanceName),
IntIntPair.of(0, 30));
for (String segmentName : segments) {
if (currentAssignment.get(segmentName).containsKey(oldInstanceName)) {
assertTrue(newAssignment.get(segmentName).containsKey(newInstanceName));
@@ -120,8 +116,8 @@ public class SegmentAssignmentUtilsTest {
// }
int newNumInstances = numInstances - 5;
newInstances =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
- newAssignment = SegmentAssignmentUtils
- .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances, NUM_REPLICAS);
+ newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
+ NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -136,26 +132,26 @@ public class SegmentAssignmentUtilsTest {
assertEquals(numSegmentsAssignedPerInstance[2], 60);
assertEquals(numSegmentsAssignedPerInstance[3], 60);
assertEquals(numSegmentsAssignedPerInstance[4], 64);
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(0)), 26);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(1)), 30);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(2)), 30);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(3)), 30);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(4)), 34);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0),
IntIntPair.of(26, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1),
IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2),
IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3),
IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4),
IntIntPair.of(34, 0));
+ for (int i = 5; i < 10; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 30));
+ }
- // Add 5 instances
+ // Add 5 instances (i0 represents instance_0)
// {
- // 0_0=[
- // instance_0, instance_1, instance_2, instance_3, instance_4,
instance_5, instance_6, instance_7,
- // instance_8, instance_9,
- // instance_10, instance_11, instance_12, instance_13, instance_14]
+ // 0_0=[i0, i1, i2, i3, i4, i5, i6, i7, i8, i9, i10, i11, i12, i13, i14]
// }
newNumInstances = numInstances + 5;
newInstances =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
- newAssignment = SegmentAssignmentUtils
- .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances, NUM_REPLICAS);
+ newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
+ NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -170,11 +166,14 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
newNumSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Each new added instance should have 20 segments to be moved to it
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), 5);
- for (int instanceId = numInstances; instanceId < newNumInstances;
instanceId++) {
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)),
newNumSegmentsPerInstance);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), newNumInstances);
+ for (int i = 0; i < numInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 10));
+ }
+ for (int i = numInstances; i < newNumInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(20, 0));
}
// Change all instances
@@ -183,8 +182,8 @@ public class SegmentAssignmentUtilsTest {
// }
String newInstanceNamePrefix = "i_";
newInstances =
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
- newAssignment = SegmentAssignmentUtils
- .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances, NUM_REPLICAS);
+ newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
+ NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -198,11 +197,12 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Each instance should have 30 segments to be moved to it
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances);
- for (String instanceName : newInstances) {
- assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName),
numSegmentsPerInstance);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), 2 * numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(newInstanceNamePrefix +
i), IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 30));
}
}
@@ -264,9 +264,8 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions, partitionIdToSegmentsMap),
- currentAssignment);
+
assertEquals(SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions,
+ partitionIdToSegmentsMap), currentAssignment);
// Replace instance_0 with instance_9, instance_4 with instance_10
// {
@@ -289,8 +288,9 @@ public class SegmentAssignmentUtilsTest {
newInstancePartitions.setInstances(0, 0, newReplicaGroup0Instances);
newInstancePartitions.setInstances(0, 1, newReplicaGroup1Instances);
newInstancePartitions.setInstances(0, 2, newReplicaGroup2Instances);
- Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions, partitionIdToSegmentsMap);
+ Map<String, Map<String, String>> newAssignment =
+
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions,
+ partitionIdToSegmentsMap);
// There should be 90 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -303,13 +303,15 @@ public class SegmentAssignmentUtilsTest {
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// All segments on instance_0 should be moved to instance_9, all segments
on instance_4 should be moved to
// instance_10
- Map<String, Integer> numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), 2);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newReplicaGroup0Instance),
numSegmentsPerInstance);
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newReplicaGroup1Instance),
numSegmentsPerInstance);
+ Map<String, IntIntPair> numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), 4);
+ assertEquals(numSegmentsToMovePerInstance.get(newReplicaGroup0Instance),
IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(newReplicaGroup1Instance),
IntIntPair.of(30, 0));
String oldReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 0;
String oldReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 4;
+ assertEquals(numSegmentsToMovePerInstance.get(oldReplicaGroup0Instance),
IntIntPair.of(0, 30));
+ assertEquals(numSegmentsToMovePerInstance.get(oldReplicaGroup0Instance),
IntIntPair.of(0, 30));
for (String segmentName : segments) {
Map<String, String> oldInstanceStateMap =
currentAssignment.get(segmentName);
if (oldInstanceStateMap.containsKey(oldReplicaGroup0Instance)) {
@@ -335,8 +337,8 @@ public class SegmentAssignmentUtilsTest {
newInstancePartitions.setInstances(0, replicaGroupId,
newInstancesForReplicaGroup);
newInstances.addAll(newInstancesForReplicaGroup);
}
- newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions, partitionIdToSegmentsMap);
+ newAssignment =
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions,
+ partitionIdToSegmentsMap);
// There should be 90 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -351,12 +353,15 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
newNumSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Each instance should have 15 segments to be moved to it
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances);
- for (String instanceName : newInstances) {
- assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName),
- newNumSegmentsPerInstance - numSegmentsPerInstance);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
+ for (String instanceName : instances) {
+ if (newInstances.contains(instanceName)) {
+ assertEquals(numSegmentsToMovePerInstance.get(instanceName),
IntIntPair.of(15, 0));
+ } else {
+ assertEquals(numSegmentsToMovePerInstance.get(instanceName),
IntIntPair.of(0, 30));
+ }
}
// Add 6 instances (2 to each replica-group)
@@ -376,8 +381,8 @@ public class SegmentAssignmentUtilsTest {
}
newInstancePartitions.setInstances(0, replicaGroupId,
newInstancesForReplicaGroup);
}
- newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions, partitionIdToSegmentsMap);
+ newAssignment =
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions,
+ partitionIdToSegmentsMap);
// There should be 90 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -392,11 +397,14 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
newNumSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Each new added instance should have 18 segments to be moved to it
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), 6);
- for (int instanceId = numInstances; instanceId < newNumInstances;
instanceId++) {
- assertEquals((int)
numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)),
newNumSegmentsPerInstance);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), newNumInstances);
+ for (int i = 0; i < numInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 12));
+ }
+ for (int i = numInstances; i < newNumInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(18, 0));
}
// Change all instances
@@ -405,7 +413,8 @@ public class SegmentAssignmentUtilsTest {
// 0_1=[i_3, i_4, i_5],
// 0_2=[i_6, i_7, i_8]
// }
- newInstances = SegmentAssignmentTestUtils.getNameList("i_", numInstances);
+ String newInstanceNamePrefix = "i_";
+ newInstances =
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
instanceIdToAdd = 0;
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS;
replicaGroupId++) {
List<String> instancesForReplicaGroup = new
ArrayList<>(numInstancesPerReplicaGroup);
@@ -414,8 +423,8 @@ public class SegmentAssignmentUtilsTest {
}
newInstancePartitions.setInstances(0, replicaGroupId,
instancesForReplicaGroup);
}
- newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions, partitionIdToSegmentsMap);
+ newAssignment =
SegmentAssignmentUtils.rebalanceReplicaGroupBasedTable(currentAssignment,
newInstancePartitions,
+ partitionIdToSegmentsMap);
// There should be 90 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -429,11 +438,12 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Each instance should have 30 segments to be moved to it
- numSegmentsToBeMovedPerInstance =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment,
newAssignment);
- assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances);
- for (String instanceName : newInstances) {
- assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName),
numSegmentsPerInstance);
+ numSegmentsToMovePerInstance =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
+ assertEquals(numSegmentsToMovePerInstance.size(), 2 * numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ assertEquals(numSegmentsToMovePerInstance.get(newInstanceNamePrefix +
i), IntIntPair.of(30, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 30));
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 1df7109ef2..c5ac9e82bd 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.rebalance;
import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -47,10 +48,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
@Test(groups = "stateless")
@@ -155,11 +153,20 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
// Segments should be moved to the new added servers
Map<String, Map<String, String>> newSegmentAssignment =
rebalanceResult.getSegmentAssignment();
- Map<String, Integer> instanceToNumSegmentsToBeMovedMap =
-
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(oldSegmentAssignment,
newSegmentAssignment);
- assertEquals(instanceToNumSegmentsToBeMovedMap.size(), numServersToAdd);
+ Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
+
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment,
newSegmentAssignment);
+ assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers +
numServersToAdd);
for (int i = 0; i < numServersToAdd; i++) {
-
assertTrue(instanceToNumSegmentsToBeMovedMap.containsKey(SERVER_INSTANCE_ID_PREFIX
+ (numServers + i)));
+ IntIntPair numSegmentsToMove =
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers +
i));
+ assertNotNull(numSegmentsToMove);
+ assertTrue(numSegmentsToMove.leftInt() > 0);
+ assertEquals(numSegmentsToMove.rightInt(), 0);
+ }
+ for (int i = 0; i < numServers; i++) {
+ IntIntPair numSegmentsToMove =
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
+ assertNotNull(numSegmentsToMove);
+ assertEquals(numSegmentsToMove.leftInt(), 0);
+ assertTrue(numSegmentsToMove.rightInt() > 0);
}
// Dry-run mode should not change the IdealState
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]