somandal commented on code in PR #15618:
URL: https://github.com/apache/pinot/pull/15618#discussion_r2069452046


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -239,6 +239,10 @@ TableRebalanceContext getTableRebalanceContext() {
     return _tableRebalanceContext;
   }
 
+  public TableRebalanceProgressStats.RebalanceProgressStats getOverallStats() {

Review Comment:
   nit: looks like this is no longer needed? let's remove this change?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1352,26 +1380,55 @@ static boolean isExternalViewConverged(String 
tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
       Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
       @Nullable Set<String> segmentsToMonitor) {
-    return isExternalViewConverged(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates, lowDiskMode,
-        bestEfforts, segmentsToMonitor, LOGGER);
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, true) == 0;
   }
 
   /**
-   * NOTE:
-   * Only check the segments in the IdealState and being monitored. Extra 
segments in ExternalView are ignored because
-   * they are not managed by the rebalancer.
-   * For each segment checked:
-   * - In regular mode, it is okay to have extra instances in ExternalView as 
long as the instance states in IdealState
-   *   are reached.
-   * - In low disk mode, instance states in ExternalView must match IdealState 
to ensure the segments are deleted from
-   *   server before moving to the next assignment.
-   * For ERROR state in ExternalView, if using best-efforts, log a warning and 
treat it as good state; if not, throw an
-   * exception to abort the rebalance because we are not able to get out of 
the ERROR state.
+   * Check if the external view has converged to the ideal state. See 
`getNumRemainingSegmentsToProcess` for details on
+   * how the convergence is determined.
    */
   private static boolean isExternalViewConverged(String tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
       Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
       @Nullable Set<String> segmentsToMonitor, Logger tableRebalanceLogger) {
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger, 
true) == 0;
+  }
+
+  @VisibleForTesting
+  static int getNumRemainingSegmentsToProcess(String tableNameWithType,
+      Map<String, Map<String, String>> externalViewSegmentStates,
+      Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
+      @Nullable Set<String> segmentsToMonitor) {
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, false);
+  }
+
+  /**
+   * Count the number of segments that are not in the expected state. If 
`earlyReturn=true` it returns 1 as soon as

Review Comment:
   recommend adding a note in the comment about the behavior when operations 
like forceCommit run and increase the number of segments that need to converge. 
Basically if they're processed within the externalViewStabilizationTimeoutInMs 
then we are good, but otherwise we will fail rebalance.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            if (previousRemainingSegments == 0) {
+              tableRebalanceLogger.info("ExternalView converged");
+              return idealState;
+            }
+          } else if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged");
+            return idealState;
+          }
         }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);

Review Comment:
   nit: can you add an empty line after this for easier code readability?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            if (previousRemainingSegments == 0) {
+              tableRebalanceLogger.info("ExternalView converged");
+              return idealState;
+            }
+          } else if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged");
+            return idealState;
+          }
         }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
+      }
+      if (externalView != null) {
+        int currentRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
+            segmentsToMonitor, tableRebalanceLogger, false);
+        if (currentRemainingSegments < previousRemainingSegments) {
+          tableRebalanceLogger.info(
+              "Extending EV stabilization timeout for another {}ms, remaining 
{} segments to be processed.",
+              externalViewStabilizationTimeoutInMs, currentRemainingSegments);
+          previousRemainingSegments = currentRemainingSegments;
+          endTimeMs += externalViewStabilizationTimeoutInMs;
+          continue;
         }
+      } else {
+        tableRebalanceLogger.warn("ExternalView is null, will not extend the 
EV stabilization timeout.");
       }
-      tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
-          externalViewCheckIntervalInMs);
-      Thread.sleep(externalViewCheckIntervalInMs);
-    } while (System.currentTimeMillis() < endTimeMs);
 
-    if (bestEfforts) {
-      tableRebalanceLogger.warn(
-          "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
-          externalViewStabilizationTimeoutInMs);
-      return idealState;
-    } else {
-      throw new TimeoutException(String.format("ExternalView has not converged 
within: %d ms",
-          externalViewStabilizationTimeoutInMs));
+      throw new TimeoutException(
+          String.format("ExternalView has no progress for: %d ms", 
externalViewStabilizationTimeoutInMs));

Review Comment:
   nit: reword: "ExternalView has not made any progress for the last %dms, 
total time spent so far: %dms"
   
   And add the total elapsed time in the exception message as well?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            if (previousRemainingSegments == 0) {
+              tableRebalanceLogger.info("ExternalView converged");
+              return idealState;
+            }
+          } else if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged");
+            return idealState;
+          }

Review Comment:
   just a suggestion: how about switching the order of the checks? and making 
them both if blocks instead of if and if-else?
   
   just so that we have a clear - isExternalViewConverged? return. If (previous 
< 0) calculate diff
   If we didn't return due to convergence, we would've mostly early exited from 
isExternalViewConverged and it is okay to re-compute for number segments 
remaining to converge once



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1387,9 +1444,13 @@ private static boolean isExternalViewConverged(String 
tableNameWithType,
           continue;
         }
 
-        // ExternalView should contain the segment
+        // If the segment has not shown up in ExternalView, it is not added yet
         if (externalViewInstanceStateMap == null) {
-          return false;
+          remainingSegmentsToProcess++;
+          if (earlyReturn) {
+            return 1;

Review Comment:
   can we return -1 instead of 1 here? a return of 1 is very weird
   
   Alternatively, shall we split the `isExternallyViewConverged` from the 
function that calculates the stats completely rather than merging them? I see 
you call them for different checks anyways



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            if (previousRemainingSegments == 0) {
+              tableRebalanceLogger.info("ExternalView converged");
+              return idealState;
+            }
+          } else if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged");
+            return idealState;
+          }
         }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
+      }
+      if (externalView != null) {
+        int currentRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
+            segmentsToMonitor, tableRebalanceLogger, false);
+        if (currentRemainingSegments < previousRemainingSegments) {
+          tableRebalanceLogger.info(

Review Comment:
   nit: can you also print the previousRemainingSegments and the number of 
segments to monitor?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1303,47 +1303,75 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
     long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            if (previousRemainingSegments == 0) {
+              tableRebalanceLogger.info("ExternalView converged");
+              return idealState;
+            }
+          } else if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged");
+            return idealState;
+          }
         }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
+      }

Review Comment:
   nit: can you add an empty line after this for easier code readability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to