somandal commented on code in PR #15110:
URL: https://github.com/apache/pinot/pull/15110#discussion_r1985741776
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -83,6 +83,13 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _bestEfforts = false;
+ // Whether to enforce Minimal Data Movement Algorithm (only effective if
instance assignment config is set, and if
+ // bootstrap is false). If set to false, the minimizeDataMovement flag in
the table config will be used to determine
+ // whether to run the Minimal Data Movement Algorithm.
+ @JsonProperty("minimizeDataMovement")
+ @ApiModelProperty(example = "TRUE")
+ private String _minimizeDataMovement = "TRUE";
Review Comment:
Can we keep this `Boolean` here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, (Boolean) null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
minimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ minimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, null);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ @Nullable Boolean minimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, minimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovementFlag) {
Review Comment:
nit: let's rename it to `minimizeDataMovementOverride`?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java:
##########
@@ -51,9 +51,7 @@
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
Review Comment:
nit: I usually avoid "*" imports. can you just add the newly added functions
here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -651,6 +654,7 @@ public RebalanceResult rebalance(
rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
+ rebalanceConfig.setMinimizeDataMovement(minimizeDataMovement);
Review Comment:
Can the conversion of this String to `Boolean` be done here itself? i.e. the
parsing for TRUE/FALSE and DEFAULT
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, (Boolean) null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
minimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ minimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, null);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ @Nullable Boolean minimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, minimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovementFlag) {
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
- boolean minimizeDataMovement =
instanceAssignmentConfig.isMinimizeDataMovement();
+ // minimizeDataMovement might be set back to false within
InstanceTagPoolSelector and InstancePartitionSelector
+ // if existingInstancePartitions is null.
+ boolean minimizeDataMovement =
+ minimizeDataMovementFlag == null ?
instanceAssignmentConfig.isMinimizeDataMovement() : minimizeDataMovementFlag;
+ LOGGER.info("Starting {} instance assignment for table {},
instanceAssignmentConfig.isMinimizeDataMovement()={}, "
+ + "minimizeDataMovement={}", instancePartitionsName,
tableNameWithType,
Review Comment:
nit: in the comment change `minimizeDataMovement=` to
`minimizeDataMovementOverride=`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -191,17 +191,21 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
boolean bestEfforts = rebalanceConfig.isBestEfforts();
long externalViewCheckIntervalInMs =
rebalanceConfig.getExternalViewCheckIntervalInMs();
long externalViewStabilizationTimeoutInMs =
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
+ String minimizeDataMovementStr = rebalanceConfig.getMinimizeDataMovement();
+ Boolean minimizeDataMovement =
+ minimizeDataMovementStr.toLowerCase().matches("^(true|false)$") ?
Boolean.valueOf(minimizeDataMovementStr)
Review Comment:
let's move this to `PinotTableRestletResource`. Also throw an exception if
the value is not one of "true" "false" or "default" from there itself.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -858,6 +867,227 @@ public void testRebalanceWithTiersAndInstanceAssignments()
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
+ @Test
+ public void testRebalanceWithMinimizeDataMovementBalanced()
+ throws Exception {
+ int numServers = 6;
+ for (int i = 0; i < numServers; i++) {
+
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" +
SERVER_INSTANCE_ID_PREFIX + i,
+ true);
+ }
+
+ // Create the table with default balanced segment assignment
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinimizeDataMovement("true");
Review Comment:
can you add a test for `setMinimizeDataMovement("false")` where TableConfig
indicates it is true?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, (Boolean) null);
Review Comment:
nit: do you need to cast null to `Boolean` here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -55,40 +55,65 @@ public InstanceAssignmentDriver(TableConfig tableConfig) {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, (Boolean) null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null,
minimizeDataMovement);
}
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
+ return assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions,
+ preConfiguredInstancePartitions, null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovement) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions,
+ minimizeDataMovement);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return assignInstances(tierName, instanceConfigs,
existingInstancePartitions, instanceAssignmentConfig, null);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig,
+ @Nullable Boolean minimizeDataMovement) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null, minimizeDataMovement);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
- @Nullable InstancePartitions preConfiguredInstancePartitions) {
+ @Nullable InstancePartitions preConfiguredInstancePartitions, @Nullable
Boolean minimizeDataMovementFlag) {
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
- boolean minimizeDataMovement =
instanceAssignmentConfig.isMinimizeDataMovement();
+ // minimizeDataMovement might be set back to false within
InstanceTagPoolSelector and InstancePartitionSelector
Review Comment:
does this comment need to be updated to reflect the new change?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -83,6 +83,13 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _bestEfforts = false;
+ // Whether to enforce Minimal Data Movement Algorithm (only effective if
instance assignment config is set, and if
+ // bootstrap is false). If set to false, the minimizeDataMovement flag in
the table config will be used to determine
Review Comment:
nit: update comment, that if set to DEFAULT we fallback on table config,
otherwise use this as the override value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]