This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 604cd16558 Add minimizeDataMovement to Rebalacne API (#15110)
604cd16558 is described below
commit 604cd165586716ae04db22856371e50147e30459
Author: Jhow <[email protected]>
AuthorDate: Fri Mar 14 10:12:33 2025 -0700
Add minimizeDataMovement to Rebalacne API (#15110)
* Add minimizeDataMovement param to RebalanceConfig
* add minimizeDataMovement in RebalanceTableCommand
---
.../api/resources/PinotTableRestletResource.java | 4 +
.../instance/InstanceAssignmentDriver.java | 38 ++-
.../helix/core/rebalance/RebalanceConfig.java | 36 ++-
.../helix/core/rebalance/TableRebalancer.java | 69 ++++--
.../instance/InstanceAssignmentTest.java | 244 ++++++++++++++++--
.../TableRebalancerClusterStatelessTest.java | 275 ++++++++++++++++++++-
.../apache/pinot/tools/PinotTableRebalancer.java | 4 +-
.../tools/admin/command/RebalanceTableCommand.java | 9 +-
8 files changed, 620 insertions(+), 59 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 5de903eb9d..fed660e988 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -612,6 +612,9 @@ public class PinotTableRestletResource {
@QueryParam("reassignInstances") boolean reassignInstances,
@ApiParam(value = "Whether to reassign CONSUMING segments for real-time
table") @DefaultValue("true")
@QueryParam("includeConsuming") boolean includeConsuming,
+ @ApiParam(value = "Whether to enable minimize data movement on
rebalance, DEFAULT will use "
+ + "the minimizeDataMovement in table config") @DefaultValue("ENABLE")
+ @QueryParam("minimizeDataMovement")
RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement,
@ApiParam(value = "Whether to rebalance table in bootstrap mode
(regardless of minimum segment movement, "
+ "reassign all segments in a round-robin fashion as if adding new
segments to an empty table)")
@DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
@@ -651,6 +654,7 @@ public class PinotTableRestletResource {
rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
+ rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
rebalanceConfig.setBootstrap(bootstrap);
rebalanceConfig.setDowntime(downtime);
rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 09866c1ed7..5ee9258014 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -55,40 +55,66 @@ public class InstanceAssignmentDriver {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, (Boolean) null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
minimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ minimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, null);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ @Nullable Boolean minimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, minimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovementOverride) {
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
- boolean minimizeDataMovement =
instanceAssignmentConfig.isMinimizeDataMovement();
+ // minimizeDataMovement might be set back to false within
InstanceTagPoolSelector and InstancePartitionSelector
+ // if existingInstancePartitions is null.
+ boolean minimizeDataMovement =
+ minimizeDataMovementOverride == null ?
instanceAssignmentConfig.isMinimizeDataMovement()
+ : minimizeDataMovementOverride;
+ LOGGER.info("Starting {} instance assignment for table {}, original
minimizeDataMovement: {}, "
+ + "overriding with minimizeDataMovementOverride: {}",
instancePartitionsName, tableNameWithType,
+ instanceAssignmentConfig.isMinimizeDataMovement(),
minimizeDataMovementOverride);
InstanceTagPoolSelector tagPoolSelector =
new
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(),
tableNameWithType,
minimizeDataMovement, existingInstancePartitions);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index f69f6be3d9..381a22fa3c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -83,6 +83,18 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _bestEfforts = false;
+ // Whether to run Minimal Data Movement Algorithm, overriding the
minimizeDataMovement flag in table config. If set
+ // to default, the minimizeDataMovement flag in table config will be used to
determine whether to run the Minimal
+ // Data Movement Algorithm.
+ @ApiModel
+ public enum MinimizeDataMovementOptions {
+ ENABLE, DISABLE, DEFAULT
+ }
+
+ @JsonProperty("minimizeDataMovement")
+ @ApiModelProperty(dataType = "string", allowableValues = "ENABLE, DISABLE,
DEFAULT", example = "ENABLE")
+ private MinimizeDataMovementOptions _minimizeDataMovement =
MinimizeDataMovementOptions.ENABLE;
+
// The check on external view can be very costly when the table has very
large ideal and external states, i.e. when
// having a huge number of segments. These two configs help reduce the cpu
load on controllers, e.g. by doing the
// check less frequently and bail out sooner to rebalance at best effort if
configured so.
@@ -244,16 +256,24 @@ public class RebalanceConfig {
_retryInitialDelayInMs = retryInitialDelayInMs;
}
+ public MinimizeDataMovementOptions getMinimizeDataMovement() {
+ return _minimizeDataMovement;
+ }
+
+ public void setMinimizeDataMovement(MinimizeDataMovementOptions
minimizeDataMovement) {
+ _minimizeDataMovement = minimizeDataMovement;
+ }
+
@Override
public String toString() {
- return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks
- + ", _reassignInstances=" + _reassignInstances + ",
_includeConsuming=" + _includeConsuming + ", _bootstrap="
- + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas="
+ _minAvailableReplicas
- + ", _bestEfforts=" + _bestEfforts + ",
_externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
- + ", _externalViewStabilizationTimeoutInMs=" +
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
- + _updateTargetTier + ", _heartbeatIntervalInMs=" +
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
- + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ",
_retryInitialDelayInMs="
- + _retryInitialDelayInMs + '}';
+ return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks + ", _reassignInstances="
+ + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ",
_minimizeDataMovement="
+ + _minimizeDataMovement + ", _bootstrap=" + _bootstrap + ",
_downtime=" + _downtime + ", _minAvailableReplicas="
+ + _minAvailableReplicas + ", _bestEfforts=" + _bestEfforts + ",
_externalViewCheckIntervalInMs="
+ + _externalViewCheckIntervalInMs + ",
_externalViewStabilizationTimeoutInMs="
+ + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" +
_updateTargetTier
+ + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ",
_heartbeatTimeoutInMs=" + _heartbeatTimeoutInMs
+ + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" +
_retryInitialDelayInMs + '}';
}
public static RebalanceConfig copy(RebalanceConfig cfg) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 34cab7c29d..fd79e25b0a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -191,17 +191,32 @@ public class TableRebalancer {
boolean bestEfforts = rebalanceConfig.isBestEfforts();
long externalViewCheckIntervalInMs =
rebalanceConfig.getExternalViewCheckIntervalInMs();
long externalViewStabilizationTimeoutInMs =
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
+ Boolean minimizeDataMovement;
+ switch (rebalanceConfig.getMinimizeDataMovement()) {
+ case DEFAULT:
+ minimizeDataMovement = null;
+ break;
+ case ENABLE:
+ minimizeDataMovement = true;
+ break;
+ case DISABLE:
+ minimizeDataMovement = false;
+ break;
+ default:
+ throw new IllegalStateException(
+ "Invalid minimizeDataMovement option: " +
rebalanceConfig.getMinimizeDataMovement());
+ }
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
LOGGER.info(
- "Start rebalancing table: {} with dryRun: {}, preChecks: {},
reassignInstances: {}, includeConsuming: {},"
- + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime:
{}, enableStrictReplicaGroup: {},"
- + "lowDiskMode: {}, bestEfforts: {},
externalViewCheckIntervalInMs: {}, "
- + "externalViewStabilizationTimeoutInMs: {}",
+ "Start rebalancing table: {} with dryRun: {}, preChecks: {},
reassignInstances: {}, "
+ + "includeConsuming: {}, bootstrap: {}, downtime: {},
minReplicasToKeepUpForNoDowntime: {}, "
+ + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {},
externalViewCheckIntervalInMs: {}, "
+ + "externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement:
{}",
tableNameWithType, dryRun, preChecks, reassignInstances,
includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts,
- externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+ externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs,
minimizeDataMovement);
// Perform pre-checks if enabled
Map<String, String> preChecksResult = null;
@@ -253,7 +268,7 @@ public class TableRebalancer {
boolean instancePartitionsUnchanged;
try {
Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean>
instancePartitionsMapAndUnchanged =
- getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap,
dryRun);
+ getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap,
dryRun, minimizeDataMovement);
instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
instancePartitionsUnchanged =
instancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
@@ -272,7 +287,8 @@ public class TableRebalancer {
try {
sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
Pair<Map<String, InstancePartitions>, Boolean>
tierToInstancePartitionsMapAndUnchanged =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun,
+ minimizeDataMovement);
tierToInstancePartitionsMap =
tierToInstancePartitionsMapAndUnchanged.getLeft();
tierInstancePartitionsUnchanged =
tierToInstancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
@@ -488,9 +504,11 @@ public class TableRebalancer {
try {
// Re-calculate the instance partitions in case the instance
configs changed during the rebalance
instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false).getLeft();
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false,
+ minimizeDataMovement).getLeft();
tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false).getLeft();
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement).getLeft();
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
@@ -727,22 +745,31 @@ public class TableRebalancer {
*/
public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean>
getInstancePartitionsMap(
TableConfig tableConfig, boolean reassignInstances, boolean bootstrap,
boolean dryRun) {
+ return getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap,
dryRun, false);
+ }
+
+ public Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean>
getInstancePartitionsMap(
+ TableConfig tableConfig, boolean reassignInstances, boolean bootstrap,
boolean dryRun,
+ @Nullable Boolean minimizeDataMovement) {
boolean instancePartitionsUnchanged;
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
if (tableConfig.getTableType() == TableType.OFFLINE) {
Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline =
- getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, bootstrap, dryRun);
+ getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, bootstrap, dryRun,
+ minimizeDataMovement);
instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
partitionAndUnchangedForOffline.getLeft());
instancePartitionsUnchanged = partitionAndUnchangedForOffline.getRight();
} else {
Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming =
- getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, bootstrap, dryRun);
+ getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, bootstrap, dryRun,
+ minimizeDataMovement);
instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
partitionAndUnchangedForConsuming.getLeft());
instancePartitionsUnchanged =
partitionAndUnchangedForConsuming.getRight();
String tableNameWithType = tableConfig.getTableName();
if
(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted =
- getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun);
+ getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun,
+ minimizeDataMovement);
LOGGER.info(
"COMPLETED segments should be relocated, fetching/computing
COMPLETED instance partitions for table: {}",
tableNameWithType);
@@ -768,7 +795,8 @@ public class TableRebalancer {
* Fetches/computes the instance partitions and also returns a boolean for
whether they are unchanged
*/
private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig
tableConfig,
- InstancePartitionsType instancePartitionsType, boolean
reassignInstances, boolean bootstrap, boolean dryRun) {
+ InstancePartitionsType instancePartitionsType, boolean
reassignInstances, boolean bootstrap, boolean dryRun,
+ @Nullable Boolean minimizeDataMovement) {
String tableNameWithType = tableConfig.getTableName();
String instancePartitionsName =
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString());
@@ -790,7 +818,7 @@ public class TableRebalancer {
// instance partition map can be fully recalculated.
instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- bootstrap ? null : existingInstancePartitions);
+ bootstrap ? null : existingInstancePartitions,
minimizeDataMovement);
instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
@@ -805,7 +833,8 @@ public class TableRebalancer {
referenceInstancePartitionsName, instancePartitionsName);
instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- bootstrap ? null : existingInstancePartitions,
preConfiguredInstancePartitions);
+ bootstrap ? null : existingInstancePartitions,
preConfiguredInstancePartitions,
+ minimizeDataMovement);
instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} (based on {})",
instancePartitions,
@@ -868,7 +897,8 @@ public class TableRebalancer {
* instance partitions are unchanged.
*/
private Pair<Map<String, InstancePartitions>, Boolean>
getTierToInstancePartitionsMap(TableConfig tableConfig,
- @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean
bootstrap, boolean dryRun) {
+ @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean
bootstrap, boolean dryRun,
+ @Nullable Boolean minimizeDataMovement) {
if (sortedTiers == null) {
return Pair.of(null, true);
}
@@ -878,7 +908,8 @@ public class TableRebalancer {
LOGGER.info("Fetching/computing instance partitions for tier: {} of
table: {}", tier.getName(),
tableConfig.getTableName());
Pair<InstancePartitions, Boolean> partitionsAndUnchanged =
- getInstancePartitionsForTier(tableConfig, tier, reassignInstances,
bootstrap, dryRun);
+ getInstancePartitionsForTier(tableConfig, tier, reassignInstances,
bootstrap, dryRun,
+ minimizeDataMovement);
tierToInstancePartitionsMap.put(tier.getName(),
partitionsAndUnchanged.getLeft());
instancePartitionsUnchanged = instancePartitionsUnchanged &&
partitionsAndUnchanged.getRight();
}
@@ -891,7 +922,7 @@ public class TableRebalancer {
* a boolean for whether the instance partition is unchanged.
*/
private Pair<InstancePartitions, Boolean>
getInstancePartitionsForTier(TableConfig tableConfig, Tier tier,
- boolean reassignInstances, boolean bootstrap, boolean dryRun) {
+ boolean reassignInstances, boolean bootstrap, boolean dryRun, @Nullable
Boolean minimizeDataMovement) {
String tableNameWithType = tableConfig.getTableName();
String tierName = tier.getName();
String instancePartitionsName =
@@ -924,7 +955,7 @@ public class TableRebalancer {
// partition map can be fully recalculated.
InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(tierName,
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- bootstrap ? null : existingInstancePartitions,
instanceAssignmentConfig);
+ bootstrap ? null : existingInstancePartitions,
instanceAssignmentConfig, minimizeDataMovement);
boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 39aef7f35a..055ce297a1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -53,6 +53,8 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -500,11 +502,224 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(9, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
}
- public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
+ public void testMirrorServerSetBasedRandom()
+ throws FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
}
- public void testMirrorServerSetBasedRandomInner(int loopCount) throws
FileNotFoundException {
+ @Test
+ public void testForceMinimizeDataMovement() {
+ // This test case is using the same instance rebalance plot as
testMinimizeDataMovement, and test whether
+ // forceMinimizeDataMovement flag in InstanceAssignmentDriver works as the
minimizeDataMovement flag in
+ // TableConfig does.
+ int numReplicas = 3;
+ int numPartitions = 2;
+ int numInstancesPerPartition = 2;
+ String partitionColumn = "partition";
+
+ // Configs and driver that minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
assertTrue(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.OFFLINE)
+ .isMinimizeDataMovement());
+
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ // Configs and driver that DO NOT minimize data movement
+ InstanceAssignmentConfig instanceAssignmentConfigNotMinimized = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+
+ TableConfig tableConfigNotMinimized = new TableConfig(tableConfig);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+
assertFalse(InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfigNotMinimized,
+ InstancePartitionsType.OFFLINE).isMinimizeDataMovement());
+ InstanceAssignmentDriver driverNotMinimized = new
InstanceAssignmentDriver(tableConfigNotMinimized);
+
+ int numInstances = 10;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Instances should be assigned to 3 replica-groups with a round-robin
fashion, each with 2 instances
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, (Boolean) null);
+
+ InstancePartitions instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, true);
+
+ InstancePartitions instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, false);
+
+ // Initial assignment should be the same for all scenarios
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ assertEquals(instancePartitionsForcedMinimize,
instancePartitionsNotMinimize);
+
+ // Remove two instances (i2, i6) and add two new instances (i10, i11).
+ instanceConfigs.remove(6);
+ instanceConfigs.remove(2);
+ for (int i = numInstances; i < numInstances + 2; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Instances should be assigned to 3 replica-groups with a round-robin
fashion, each with 3 instances, then these 3
+ // instances should be assigned to 2 partitions, each with 2 instances
+ // Leverage the latest instancePartitions from last computation as the
parameter.
+ // Data movement is minimized so that: i2 -> i10, i6 -> i11
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, instancePartitions, (Boolean) null);
+
+ instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsForcedMinimize, true);
+
+ // Data movement here is not minimized
+ instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsNotMinimize, false);
+
+ // Forced minimized data movement should be the same as minimized data
movement
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ // Without minimizeDataMovement set to true, the data movement is not
minimized and should be different
+ assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+ // Add 2 more instances to the ZK and increase the number of instances per
partition from 2 to 3.
+ for (int i = numInstances + 2; i < numInstances + 4; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+ numInstancesPerPartition = 3;
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+
+ instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, instancePartitions, (Boolean) null);
+
+ instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsForcedMinimize, true);
+
+ instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsNotMinimize, false);
+
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+ // Reduce the number of instances per partition from 3 to 2.
+ numInstancesPerPartition = 2;
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+
+ instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, instancePartitions, (Boolean) null);
+
+ instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsForcedMinimize, true);
+
+ instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsNotMinimize, false);
+
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+ // Add one more replica group (from 3 to 4).
+ numReplicas = 4;
+
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+
+ instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, instancePartitions, (Boolean) null);
+
+ instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsForcedMinimize, true);
+
+ instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsNotMinimize, false);
+
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+
+ // Remove one replica group (from 4 to 3).
+ numReplicas = 3;
+
tableConfig.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
tableConfigNotMinimized.getValidationConfig().setReplication(Integer.toString(numReplicas));
+
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, true);
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+
+ instanceAssignmentConfigNotMinimized = new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0,
numPartitions, numInstancesPerPartition, false,
+ partitionColumn), null, false);
+ tableConfigNotMinimized.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfigNotMinimized));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, instancePartitions, (Boolean) null);
+
+ instancePartitionsForcedMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsForcedMinimize, true);
+
+ instancePartitionsNotMinimize =
+ driverNotMinimized.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs,
+ instancePartitionsNotMinimize, false);
+
+ assertEquals(instancePartitionsForcedMinimize, instancePartitions);
+ assertNotEquals(instancePartitionsNotMinimize, instancePartitions);
+ }
+
+ public void testMirrorServerSetBasedRandomInner(int loopCount)
+ throws FileNotFoundException {
PrintStream o = new PrintStream("output.txt");
System.setOut(o);
for (int iter = 0; iter < loopCount; iter++) {
@@ -1212,7 +1427,6 @@ public class InstanceAssignmentTest {
SERVER_INSTANCE_ID_PREFIX + 20,
SERVER_INSTANCE_ID_PREFIX + 23));
-
// upscale 3*3 to 3*5
numPartitions = 0;
numInstancesPerPartition = 0;
@@ -2209,9 +2423,9 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup,
0, 0, false, null);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
-
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
@@ -2243,9 +2457,9 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false, null);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
-
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
@@ -2285,9 +2499,9 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false, null);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
-
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
@@ -2397,9 +2611,9 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
numInstancesPerPartition, true, null);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
-
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
true)))
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
true)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
// existingInstancePartitions = instancePartitions
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5b99e56cc3..cab18467f0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -95,8 +95,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -513,6 +513,13 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNull(rebalanceResult.getPreChecksResult());
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ for (int i = 0; i < numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+ }
executorService.shutdown();
}
@@ -532,9 +539,10 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
_helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
NO_TIER_NAME, numServers, numServers, 0));
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setServerTenant(NO_TIER_NAME).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+ .setServerTenant(NO_TIER_NAME)
+ .build();
// Create the table
addDummySchema(TIERED_TABLE_NAME);
_helixResourceManager.addTable(tableConfig);
@@ -675,9 +683,10 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.createServerTenant(
new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME,
numServers, numServers, 0));
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+ .setServerTenant("replicaAssignment" + NO_TIER_NAME)
+ .build();
// Create the table
addDummySchema(TIERED_TABLE_NAME);
_helixResourceManager.addTable(tableConfig);
@@ -858,6 +867,256 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+ @Test
+ public void testRebalanceWithMinimizeDataMovementBalanced()
+ throws Exception {
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" +
SERVER_INSTANCE_ID_PREFIX + i,
+ true);
+ }
+
+ // Create the table with default balanced segment assignment
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ RebalanceSummaryResult.ServerInfo rebalanceServerInfo =
rebalanceSummaryResult.getServerInfo();
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add one server instance
+
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" +
SERVER_INSTANCE_ID_PREFIX, true);
+
+ // Table without instance assignment config should work fine (ignore) with
the minimizeDataMovement flag set
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ rebalanceServerInfo = rebalanceSummaryResult.getServerInfo();
+ // Should see the added server
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceServerInfo.getNumServers().getValueBeforeRebalance(),
numServers);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
numServers + 1);
+
+ // Check if the instance assignment is the same as the one without
minimizeDataMovement flag set
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT);
+ RebalanceResult rebalanceResultWithoutMinimized =
tableRebalancer.rebalance(tableConfig, rebalanceConfig, null);
+
+ assertEquals(rebalanceResult.getInstanceAssignment(),
rebalanceResultWithoutMinimized.getInstanceAssignment());
+
+ // Rebalance
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ // Should see the added server in the instance assignment
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getInstances(0,
0).size(),
+ numServers + 1);
+
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance("minimizeDataMovement_balance_" +
SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ @Test
+ public void testRebalanceWithMinimizeDataMovementInstanceAssignments()
+ throws Exception {
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" +
SERVER_INSTANCE_ID_PREFIX + i, true);
+ }
+
+ // One instance per replica group, no partition
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0,
0, false, null), null, false);
+
+ // Create the table
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+ .setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig))
+ .build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ RebalanceSummaryResult.ServerInfo rebalanceServerInfo =
rebalanceSummaryResult.getServerInfo();
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
assertEquals(rebalanceServerInfo.getNumServers().getExpectedValueAfterRebalance(),
NUM_REPLICAS);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add one server instance
+ addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" +
SERVER_INSTANCE_ID_PREFIX + numServers, true);
+
+ // increase replica group size by 1
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS + 1, 1,
0, 0, false, null), null, false);
+
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+
+ // Try dry-run summary mode
+
+ // without minimize data movement, it's supposed to add more than one
server
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceServerInfo =
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+ // note: this assertion may fail due to instance assignment algorithm
changed in the future.
+ // right now, rebalance without minimizing data movement adds more than
one server and remove some servers in the
+ // testing setup like this.
+ assertTrue(rebalanceServerInfo.getServersAdded().size() > 1);
+ assertEquals(rebalanceServerInfo.getServersAdded().size() -
rebalanceServerInfo.getServersRemoved().size(), 1);
+
+ // use default table config's minimizeDataMovement flag, should be
equivalent to without minimize data movement
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceServerInfo =
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+ assertTrue(rebalanceServerInfo.getServersAdded().size() > 1);
+ assertEquals(rebalanceServerInfo.getServersAdded().size() -
rebalanceServerInfo.getServersRemoved().size(), 1);
+
+ // with minimize data movement, we should add 1 server only
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceServerInfo =
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+ assertEquals(rebalanceServerInfo.getServersAdded().size(), 1);
+ assertEquals(rebalanceServerInfo.getServersRemoved().size(), 0);
+
+ // rebalance without dry-run
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
+ NUM_REPLICAS + 1);
+
+ // add one server instance
+ addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_" +
SERVER_INSTANCE_ID_PREFIX + (numServers + 1),
+ true);
+
+ // decrease replica group size by 1
+ instanceAssignmentConfig = new InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 1, 0,
0, false, null), null, false);
+
+ tableConfig.setInstanceAssignmentConfigMap(Map.of("OFFLINE",
instanceAssignmentConfig));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceServerInfo =
rebalanceResult.getRebalanceSummaryResult().getServerInfo();
+
+ // with minimize data movement, we should remove 1 server only
+ assertEquals(rebalanceServerInfo.getServersAdded().size(), 0);
+ assertEquals(rebalanceServerInfo.getServersRemoved().size(), 1);
+
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+
rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertEquals(rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE).getNumReplicaGroups(),
+ NUM_REPLICAS);
+
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance("minimizeDataMovement_" +
SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
@AfterClass
public void tearDown() {
stopFakeInstances();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index f1165bc9d4..7b08af32c0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -33,7 +33,8 @@ public class PinotTableRebalancer extends PinotZKChanger {
private final RebalanceConfig _rebalanceConfig = new RebalanceConfig();
public PinotTableRebalancer(String zkAddress, String clusterName, boolean
dryRun, boolean preChecks,
- boolean reassignInstances, boolean includeConsuming, boolean bootstrap,
boolean downtime,
+ boolean reassignInstances, boolean includeConsuming,
+ RebalanceConfig.MinimizeDataMovementOptions minimizeDataMovement,
boolean bootstrap, boolean downtime,
int minReplicasToKeepUpForNoDowntime, boolean lowDiskMode, boolean
bestEffort, long externalViewCheckIntervalInMs,
long externalViewStabilizationTimeoutInMs) {
super(zkAddress, clusterName);
@@ -41,6 +42,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
_rebalanceConfig.setPreChecks(preChecks);
_rebalanceConfig.setReassignInstances(reassignInstances);
_rebalanceConfig.setIncludeConsuming(includeConsuming);
+ _rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
_rebalanceConfig.setBootstrap(bootstrap);
_rebalanceConfig.setDowntime(downtime);
_rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 96ed26994a..e94e7b2773 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -64,6 +64,11 @@ public class RebalanceTableCommand extends
AbstractBaseAdminCommand implements C
description = "Whether to reassign CONSUMING segments for real-time
table (true by default)")
private boolean _includeConsuming = true;
+ @CommandLine.Option(names = {"-minimizeDataMovement"}, description =
"Whether to enable, disable minimize data "
+ + "movement algorithm, or use table's default config")
+ private RebalanceConfig.MinimizeDataMovementOptions _minimizeDataMovement =
+ RebalanceConfig.MinimizeDataMovementOptions.ENABLE;
+
@CommandLine.Option(names = {"-bootstrap"},
description = "Whether to rebalance table in bootstrap mode (regardless
of minimum segment movement, reassign"
+ " all segments in a round-robin fashion as if adding new segments
to an empty table, false by default)")
@@ -110,8 +115,8 @@ public class RebalanceTableCommand extends
AbstractBaseAdminCommand implements C
throws Exception {
PinotTableRebalancer tableRebalancer =
new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun,
_preChecks, _reassignInstances, _includeConsuming,
- _bootstrap, _downtime, _minAvailableReplicas, _lowDiskMode,
_bestEfforts, _externalViewCheckIntervalInMs,
- _externalViewStabilizationTimeoutInMs);
+ _minimizeDataMovement, _bootstrap, _downtime,
_minAvailableReplicas, _lowDiskMode, _bestEfforts,
+ _externalViewCheckIntervalInMs,
_externalViewStabilizationTimeoutInMs);
RebalanceResult rebalanceResult =
tableRebalancer.rebalance(_tableNameWithType);
LOGGER
.info("Got rebalance result: {} for table: {}",
JsonUtils.objectToString(rebalanceResult), _tableNameWithType);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]