This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3e118f1fa Fix up elapsed time calculation for rebalance progress
stats when carry over are present (#15674)
c3e118f1fa is described below
commit c3e118f1fa737acde84e50b23d8c3c1d1979a5b6
Author: Sonam Mandal <[email protected]>
AuthorDate: Wed Apr 30 22:25:12 2025 -0700
Fix up elapsed time calculation for rebalance progress stats when carry
over are present (#15674)
* Fix up elapsed time calculation for rebalance progress stats when carry
over
* Fix extra commented line
---
.../rebalance/TableRebalanceProgressStats.java | 4 +-
.../rebalance/ZkBasedTableRebalanceObserver.java | 4 +-
.../TestZkBasedTableRebalanceObserver.java | 125 +++++++++++++++++++++
3 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
index 050052e83a..47d223804f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
@@ -242,7 +242,9 @@ public class TableRebalanceProgressStats {
int remainingSegmentsToChange) {
double elapsedTimeInSeconds = (double) (System.currentTimeMillis() -
startTime) / 1000.0;
int segmentsAlreadyChanged = totalSegmentsToChange -
remainingSegmentsToChange;
- return segmentsAlreadyChanged == 0 ? totalSegmentsToChange == 0 ? 0.0 :
-1.0
+ // If carry over + remaining segments to change are > total segments to
change then number of segments already
+ // changed may be -ve, in which case we should just set the default value
as we cannot measure elapsed time
+ return segmentsAlreadyChanged <= 0 ? totalSegmentsToChange == 0 ? 0.0 :
-1.0
: (double) remainingSegmentsToChange / (double) segmentsAlreadyChanged
* elapsedTimeInSeconds;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 125b0568bc..5eaa80e91d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -581,10 +581,10 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
startTimeMs = existingProgressStats._startTimeMs;
progressStats._estimatedTimeToCompleteAddsInSeconds =
TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs,
- progressStats._totalSegmentsToBeAdded,
progressStats._totalRemainingSegmentsToBeAdded);
+ progressStats._totalSegmentsToBeAdded, totalSegmentsToBeAdded);
progressStats._estimatedTimeToCompleteDeletesInSeconds =
TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs,
- progressStats._totalSegmentsToBeDeleted,
progressStats._totalRemainingSegmentsToBeDeleted);
+ progressStats._totalSegmentsToBeDeleted,
totalSegmentsToBeDeleted);
progressStats._averageSegmentSizeInBytes =
existingProgressStats._averageSegmentSizeInBytes;
progressStats._totalEstimatedDataToBeMovedInBytes =
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 0d2f395d79..7ae66a7b30 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -1293,4 +1293,129 @@ public class TestZkBasedTableRebalanceObserver {
assertEquals(current, target);
}
+
+ @Test
+ void testElapsedTimeWithCarryOverProgressStats() {
+ long estimatedAverageSegmentSize = 1024;
+
+ Map<String, Map<String, String>> currentIS = new TreeMap<>();
+ currentIS.put("segment1",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE));
+ currentIS.put("segment2",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2"), ONLINE));
+ currentIS.put("segment3",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+
+ // Assume that the current EV doesn't yet have the segments added as seen
in currentIS
+ Map<String, Map<String, String>> currentEV = new TreeMap<>();
+
+ Map<String, Map<String, String>> targetIS = new TreeMap<>();
+ targetIS.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2",
"host3", "host4", "host5"), ONLINE));
+ targetIS.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1",
"host3", "host4", "host5"), ONLINE));
+ targetIS.put("segment3",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1",
"host3", "host4", "host5"), ONLINE));
+
+ TableRebalanceProgressStats tableRebalanceProgressStats = new
TableRebalanceProgressStats();
+
+ // Initialize the start trigger with some change
+ Set<String> segmentSet = new HashSet<>(targetIS.keySet());
+ TableRebalanceObserver.RebalanceContext rebalanceContext = new
TableRebalanceObserver.RebalanceContext(
+ estimatedAverageSegmentSize, segmentSet, segmentSet);
+ TableRebalanceProgressStats.RebalanceProgressStats stats =
+ ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(targetIS,
currentIS, rebalanceContext,
+ TableRebalanceObserver.Trigger.START_TRIGGER,
tableRebalanceProgressStats);
+ assertEquals(stats._totalSegmentsToBeAdded, 11);
+ assertEquals(stats._totalSegmentsToBeDeleted, 2);
+ assertEquals(stats._totalRemainingSegmentsToBeAdded, 11);
+ assertEquals(stats._totalRemainingSegmentsToBeDeleted, 2);
+ assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+ assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+ assertEquals(stats._percentageRemainingSegmentsToBeAdded, 100.0);
+ assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 100.0);
+ assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+ assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+ assertEquals(stats._averageSegmentSizeInBytes,
estimatedAverageSegmentSize);
+ assertEquals(stats._totalEstimatedDataToBeMovedInBytes,
estimatedAverageSegmentSize * 11);
+ tableRebalanceProgressStats.setRebalanceProgressStatsOverall(stats);
+
+ // Next call EV-IS convergence (assume here that the IS is not yet updated
like happens in actual rebalance)
+ // Since currentEV does not match currentIS, IS-EV convergence will detect
some segments are carry over segments
+ // and wait for these to converge before moving on to the first IS update
+ // Also validate that the overall progress stats shows elapsed time as
-1.0 instead of some random -ve time
+ segmentSet = new HashSet<>(targetIS.keySet());
+ rebalanceContext = new
TableRebalanceObserver.RebalanceContext(estimatedAverageSegmentSize,
segmentSet, segmentSet);
+ stats =
ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS,
currentEV, rebalanceContext,
+
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+ tableRebalanceProgressStats);
+
tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats);
+ stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep();
+ assertEquals(stats._totalSegmentsToBeAdded, 0);
+ assertEquals(stats._totalSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalRemainingSegmentsToBeAdded, 0);
+ assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalCarryOverSegmentsToBeAdded, 3);
+ assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+ assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+ assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0);
+ assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0);
+ assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0);
+ assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0);
+ assertEquals(stats._averageSegmentSizeInBytes, 0);
+ assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0);
+ TableRebalanceProgressStats.RebalanceProgressStats overallStats =
+ tableRebalanceProgressStats.getRebalanceProgressStatsOverall();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 11);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 2);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 3);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToConverge, 0);
+ assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance,
0);
+ assertEquals(overallStats._percentageRemainingSegmentsToBeAdded,
127.27272727272727);
+ assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0);
+ assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+ assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+ assertEquals(overallStats._averageSegmentSizeInBytes,
estimatedAverageSegmentSize);
+ assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes,
estimatedAverageSegmentSize * 11);
+
+ // currentEV has converged to match currentIS, no more carry over segments
should be seen
+ currentEV = new TreeMap<>(currentIS);
+ stats =
ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS,
currentEV, rebalanceContext,
+
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+ tableRebalanceProgressStats);
+
tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats);
+ stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep();
+ assertEquals(stats._totalSegmentsToBeAdded, 0);
+ assertEquals(stats._totalSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalRemainingSegmentsToBeAdded, 0);
+ assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+ assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+ assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0);
+ assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0);
+ assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0);
+ assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0);
+ assertEquals(stats._averageSegmentSizeInBytes, 0);
+ assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0);
+ overallStats =
tableRebalanceProgressStats.getRebalanceProgressStatsOverall();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 11);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 2);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToConverge, 0);
+ assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance,
0);
+ assertEquals(overallStats._percentageRemainingSegmentsToBeAdded, 100.0);
+ assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0);
+ assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+ assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+ assertEquals(overallStats._averageSegmentSizeInBytes,
estimatedAverageSegmentSize);
+ assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes,
estimatedAverageSegmentSize * 11);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]