somandal commented on code in PR #15618:
URL: https://github.com/apache/pinot/pull/15618#discussion_r2072001496
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String
tableNameWithType, boole
long endTimeMs = System.currentTimeMillis() +
externalViewStabilizationTimeoutInMs;
IdealState idealState;
- do {
- tableRebalanceLogger.debug("Start to check if ExternalView converges to
IdealStates");
- idealState =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
- // IdealState might be null if table got deleted, throwing exception to
abort the rebalance
- Preconditions.checkState(idealState != null, "Failed to find the
IdealState");
-
- ExternalView externalView =
-
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
- // ExternalView might be null when table is just created, skipping check
for this iteration
- if (externalView != null) {
- // Record external view and ideal state convergence status
- TableRebalanceObserver.RebalanceContext rebalanceContext = new
TableRebalanceObserver.RebalanceContext(
- estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState,
segmentsToMonitor);
- _tableRebalanceObserver.onTrigger(
-
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
- externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields(), rebalanceContext);
- // Update unique segment list as IS-EV trigger must have processed
these
- allSegmentsFromIdealState =
idealState.getRecord().getMapFields().keySet();
- if (_tableRebalanceObserver.isStopped()) {
- throw new RuntimeException(
- String.format("Rebalance has already stopped with status: %s",
_tableRebalanceObserver.getStopStatus()));
+ ExternalView externalView;
+ int previousRemainingSegments = -1;
+ while (true) {
+ do {
+ tableRebalanceLogger.debug("Start to check if ExternalView converges
to IdealStates");
+ idealState =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+ // IdealState might be null if table got deleted, throwing exception
to abort the rebalance
+ Preconditions.checkState(idealState != null, "Failed to find the
IdealState");
+
+ externalView =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+ // ExternalView might be null when table is just created, skipping
check for this iteration
+ if (externalView != null) {
+ // Record external view and ideal state convergence status
+ TableRebalanceObserver.RebalanceContext rebalanceContext = new
TableRebalanceObserver.RebalanceContext(
+ estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState,
segmentsToMonitor);
+ _tableRebalanceObserver.onTrigger(
+
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+ externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields(), rebalanceContext);
+ // Update unique segment list as IS-EV trigger must have processed
these
+ allSegmentsFromIdealState =
idealState.getRecord().getMapFields().keySet();
+ if (_tableRebalanceObserver.isStopped()) {
+ throw new RuntimeException(
+ String.format("Rebalance has already stopped with status: %s",
+ _tableRebalanceObserver.getStopStatus()));
+ }
+ if (previousRemainingSegments < 0) {
+ // initialize previousRemainingSegments
+ previousRemainingSegments =
getNumRemainingSegmentsToProcess(tableNameWithType,
+ externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields(), lowDiskMode,
+ bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+ if (previousRemainingSegments == 0) {
+ tableRebalanceLogger.info("ExternalView converged");
+ return idealState;
+ }
+ } else if (isExternalViewConverged(tableNameWithType,
externalView.getRecord().getMapFields(),
+ idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
segmentsToMonitor,
+ tableRebalanceLogger)) {
+ tableRebalanceLogger.info("ExternalView converged");
+ return idealState;
+ }
}
- if (isExternalViewConverged(tableNameWithType,
externalView.getRecord().getMapFields(),
- idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
segmentsToMonitor, tableRebalanceLogger)) {
- tableRebalanceLogger.info("ExternalView converged");
- return idealState;
+ tableRebalanceLogger.debug("ExternalView has not converged to
IdealStates. Retry after: {}ms",
+ externalViewCheckIntervalInMs);
+ Thread.sleep(externalViewCheckIntervalInMs);
+ } while (System.currentTimeMillis() < endTimeMs);
+ if (bestEfforts) {
+ tableRebalanceLogger.warn(
+ "ExternalView has not converged within: {}ms, continuing the
rebalance (best-efforts)",
+ externalViewStabilizationTimeoutInMs);
+ return idealState;
+ }
+ if (externalView != null) {
+ int currentRemainingSegments =
getNumRemainingSegmentsToProcess(tableNameWithType,
+ externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
+ segmentsToMonitor, tableRebalanceLogger, false);
+ if (currentRemainingSegments < previousRemainingSegments) {
+ tableRebalanceLogger.info(
+ "Extending EV stabilization timeout for another {}ms, remaining
{} segments to be processed.",
+ externalViewStabilizationTimeoutInMs, currentRemainingSegments);
+ previousRemainingSegments = currentRemainingSegments;
+ endTimeMs += externalViewStabilizationTimeoutInMs;
+ continue;
}
+ } else {
+ tableRebalanceLogger.warn("ExternalView is null, will not extend the
EV stabilization timeout.");
}
- tableRebalanceLogger.debug("ExternalView has not converged to
IdealStates. Retry after: {}ms",
- externalViewCheckIntervalInMs);
- Thread.sleep(externalViewCheckIntervalInMs);
- } while (System.currentTimeMillis() < endTimeMs);
- if (bestEfforts) {
- tableRebalanceLogger.warn(
- "ExternalView has not converged within: {}ms, continuing the
rebalance (best-efforts)",
- externalViewStabilizationTimeoutInMs);
- return idealState;
- } else {
- throw new TimeoutException(String.format("ExternalView has not converged
within: %d ms",
- externalViewStabilizationTimeoutInMs));
+ throw new TimeoutException(
+ String.format("ExternalView has no progress for: %d ms",
externalViewStabilizationTimeoutInMs));
Review Comment:
total elapsed time -> time since we started this convergence check
(including all the times the timeout was extended)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]