This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 79da40671c Fix internal logic of pre-check minimizeDataMovement
(#15527)
79da40671c is described below
commit 79da40671c5feb04f4ddab5c2e7948b0e759805d
Author: Jhow <[email protected]>
AuthorDate: Fri Apr 11 20:00:51 2025 -0700
Fix internal logic of pre-check minimizeDataMovement (#15527)
* fix internal logic of pre-check minimizeDataMovement
---
.../core/rebalance/DefaultRebalancePreChecker.java | 55 ++++++++++++++++------
1 file changed, 41 insertions(+), 14 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 62be8dff80..ea429990d0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -130,7 +130,7 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
Map<String, JsonNode> needsReloadMetadata =
needsReloadMetadataPair.getServerReloadJsonResponses();
int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
LOGGER.info("Received {} needs reload responses and {} failed responses
from servers for table: {} with "
- + "rebalanceJobId: {}, number of servers queried: {}",
needsReloadMetadata.size(), failedResponses,
+ + "rebalanceJobId: {}, number of servers queried: {}",
needsReloadMetadata.size(), failedResponses,
tableNameWithType, rebalanceJobId, currentlyAssignedServers.size());
needsReload = needsReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
if (!needsReload && failedResponses > 0) {
@@ -183,21 +183,22 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
instanceAssignmentConfigConsuming =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.CONSUMING);
}
- // For REALTIME tables need to check for both CONSUMING and COMPLETED
segments if relocation is enabled
+ // For REALTIME tables if COMPLETED segments are not to be relocated,
check for only CONSUMING segment instance
+ // assignment config if presents
if
(!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
- RebalancePreCheckerResult rebalancePreCheckerResult;
if (isInstanceAssignmentAllowedConsuming) {
if (rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.ENABLE) {
return RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
}
if (instanceAssignmentConfigConsuming.isMinimizeDataMovement()) {
return rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.DISABLE
- ? RebalancePreCheckerResult.warn("minimizeDataMovement is
enabled in table config but it's overridden "
- + "with disabled")
+ ? RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is enabled for CONSUMING segments in
table config but it's overridden "
+ + "with disabled")
: RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
}
- return RebalancePreCheckerResult.warn("minimizeDataMovement is not
enabled but instance assignment is "
- + "allowed");
+ return RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is not enabled for CONSUMING segments but
instance assignment is allowed");
}
return RebalancePreCheckerResult.pass("Instance assignment not
allowed, no need for minimizeDataMovement");
}
@@ -210,7 +211,8 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.COMPLETED);
}
- RebalancePreCheckerResult rebalancePreCheckerResult;
+ // COMPLETED segments are to be relocated, check both COMPLETED and
CONSUMING segment instance assignment config
+ // that present
if (!isInstanceAssignmentAllowedConsuming &&
!isInstanceAssignmentAllowedCompleted) {
return RebalancePreCheckerResult.pass("Instance assignment not
allowed, no need for minimizeDataMovement");
} else if (instanceAssignmentConfigConsuming != null &&
instanceAssignmentConfigCompleted != null) {
@@ -220,16 +222,41 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
if (instanceAssignmentConfigCompleted.isMinimizeDataMovement()
&& instanceAssignmentConfigConsuming.isMinimizeDataMovement()) {
return rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.DISABLE
- ? RebalancePreCheckerResult.warn("minimizeDataMovement is
enabled in table config but it's overridden "
- + "with disabled")
+ ? RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is enabled for both COMPLETED and
CONSUMING segments in table config but it's "
+ + "overridden with disabled")
+ : RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
+ }
+ return RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is not enabled for either or both COMPLETED
and CONSUMING segments, but instance "
+ + "assignment is allowed for both");
+ } else if (instanceAssignmentConfigConsuming != null) {
+ if (rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.ENABLE) {
+ return RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
+ }
+ if (instanceAssignmentConfigConsuming.isMinimizeDataMovement()) {
+ return rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.DISABLE
+ ? RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is enabled for CONSUMING segments in table
config but it's overridden with "
+ + "disabled")
+ : RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
+ }
+ return RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is not enabled for CONSUMING segments, but
instance assignment is allowed");
+ } else {
+ if (rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.ENABLE) {
+ return RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
+ }
+ if (instanceAssignmentConfigCompleted.isMinimizeDataMovement()) {
+ return rebalanceConfig.getMinimizeDataMovement() ==
RebalanceConfig.MinimizeDataMovementOptions.DISABLE
+ ? RebalancePreCheckerResult.warn(
+ "minimizeDataMovement is enabled for COMPLETED segments in table
config but it's overridden "
+ + "with disabled")
: RebalancePreCheckerResult.pass("minimizeDataMovement is
enabled");
}
return RebalancePreCheckerResult.warn(
- "minimizeDataMovement may not be enabled for consuming or
completed, but instance assigment is allowed "
- + "for both");
+ "minimizeDataMovement is not enabled for COMPLETED segments, but
instance assignment is allowed");
}
- return RebalancePreCheckerResult.warn("minimizeDataMovement may not
enabled for "
- + "consuming or completed but instance assignment is allowed for at
least one");
} catch (IllegalStateException e) {
LOGGER.warn("Error while trying to fetch instance assignment config,
assuming minimizeDataMovement is false", e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]