somandal commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2027903591
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
(totalSegments == 0) ? 0 : ((double)
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
return rebalanceStats;
}
+
+ /**
+ * Calculates the progress stats for the given step or for the overall based
on the trigger type
+ * @return the calculated step or progress stats
+ */
+ @VisibleForTesting
+ static TableRebalanceProgressStats.RebalanceProgressStats
calculateUpdatedProgressStats(
+ Map<String, Map<String, String>> targetAssignment, Map<String,
Map<String, String>> currentAssignment,
+ RebalanceContext rebalanceContext, Trigger trigger,
TableRebalanceProgressStats rebalanceProgressStats) {
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new
HashMap<>();
+ Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+ Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+ int totalNewSegmentsNotMonitored = 0;
+ int totalSegmentsTarget = 0;
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ String segmentName = entrySet.getKey();
+ if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+ newSegmentsNotExistingBefore.add(segmentName);
+ }
+ for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+ String instanceName = entry.getKey();
+ String instanceState = entry.getValue();
+ if (segmentsToMonitor != null &&
!segmentsToMonitor.contains(segmentName)) {
Review Comment:
Have added some comments about this. Basically, the logic in Table
Rebalancer (`isExternalViewConverged`) only tracks segments if they fall into
the following conditions:
- Are part of the `segmentsToMonitor` set, which is basically the list of
all segments that are moving as part of this rebalance step (or the previous
one)
- Segment is not OFFLINE state in IS
- For `lowDiskMode` if the EV deletion isn't completed yet, etc
Here I also only monitor stats for segments that are on the
`segmentsToMonitor` list as calculated by table rebalancer. That's why if I see
a segment is not on the monitor list, I skip processing it for the stats. hope
that makes sense
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
(totalSegments == 0) ? 0 : ((double)
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
return rebalanceStats;
}
+
+ /**
+ * Calculates the progress stats for the given step or for the overall based
on the trigger type
+ * @return the calculated step or progress stats
+ */
+ @VisibleForTesting
+ static TableRebalanceProgressStats.RebalanceProgressStats
calculateUpdatedProgressStats(
+ Map<String, Map<String, String>> targetAssignment, Map<String,
Map<String, String>> currentAssignment,
+ RebalanceContext rebalanceContext, Trigger trigger,
TableRebalanceProgressStats rebalanceProgressStats) {
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new
HashMap<>();
+ Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+ Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+ int totalNewSegmentsNotMonitored = 0;
+ int totalSegmentsTarget = 0;
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ String segmentName = entrySet.getKey();
+ if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+ newSegmentsNotExistingBefore.add(segmentName);
+ }
+ for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+ String instanceName = entry.getKey();
+ String instanceState = entry.getValue();
+ if (segmentsToMonitor != null &&
!segmentsToMonitor.contains(segmentName)) {
+ if (newSegmentsNotExistingBefore.contains(segmentName)) {
+ // Don't track newly added segments unless they're on the monitor
list
+ totalNewSegmentsNotMonitored++;
+ newSegmentsNotExistingBefore.remove(segmentName);
+ }
+ continue;
+ }
+ if
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
{
+ // Skip tracking segments that are in OFFLINE state in the target
assignment
+ targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k
-> new HashSet<>()).add(segmentName);
+ continue;
+ }
+ totalSegmentsTarget += 1;
+ newServersToSegmentMap.computeIfAbsent(instanceName, k -> new
HashSet<>()).add(segmentName);
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ String segmentName = entrySet.getKey();
+ for (String instanceName : entrySet.getValue().keySet()) {
+ if (segmentsToMonitor != null &&
!segmentsToMonitor.contains(segmentName)) {
+ continue;
+ }
+ if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName)
+ &&
targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) {
+ // Skip tracking segments that are in OFFLINE state in the target
assignment
+ continue;
+ }
+ existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new
HashSet<>()).add(segmentName);
+ }
+ }
+
+ int segmentsNotMoved = 0;
+ int totalSegmentsToBeDeleted = 0;
+ int segmentsUnchangedYetNotConverged = 0;
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ Set<String> segmentSet = entry.getValue();
+
+ Set<String> newSegmentSet = new HashSet<>(segmentSet);
+ Set<String> existingSegmentSet = new HashSet<>();
+ int segmentsUnchanged = 0;
+ if (existingServersToSegmentMap.containsKey(server)) {
+ Set<String> segmentSetForServer =
existingServersToSegmentMap.get(server);
+ existingSegmentSet.addAll(segmentSetForServer);
+ Set<String> intersection = new HashSet<>(segmentSetForServer);
+ intersection.retainAll(newSegmentSet);
+ segmentsUnchanged = intersection.size();
+ segmentsNotMoved += segmentsUnchanged;
+
+ for (String segmentName : intersection) {
+ String currentInstanceState =
currentAssignment.get(segmentName).get(server);
+ String targetInstanceState =
targetAssignment.get(segmentName).get(server);
+ if (!currentInstanceState.equals(targetInstanceState)) {
+ segmentsUnchangedYetNotConverged++;
+ }
+ }
+ }
+ newSegmentSet.removeAll(existingSegmentSet);
+ totalSegmentsToBeDeleted += existingSegmentSet.size() -
segmentsUnchanged;
+ }
+
+ for (Map.Entry<String, Set<String>> entry :
existingServersToSegmentMap.entrySet()) {
+ if (!newServersToSegmentMap.containsKey(entry.getKey())) {
+ totalSegmentsToBeDeleted += entry.getValue().size();
+ }
+ }
+
+ int newSegsAddedInThisAssignment = 0;
+ int newSegsDeletedInThisAssignment = 0;
+ for (String segment : newSegmentsNotExistingBefore) {
+ Set<String> currentSegmentAssign = currentAssignment.get(segment) != null
+ ? currentAssignment.get(segment).keySet() : new HashSet<>();
+ Set<String> targetSegmentAssign = targetAssignment.get(segment) != null
+ ? targetAssignment.get(segment).keySet() : new HashSet<>();
+
+ Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign);
+ segmentsAdded.removeAll(currentSegmentAssign);
+ newSegsAddedInThisAssignment += segmentsAdded.size();
+
+ Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign);
+ segmentsDeleted.removeAll(targetSegmentAssign);
+ newSegsDeletedInThisAssignment += segmentsDeleted.size();
+ }
+
+ int newNumberSegmentsTotal = totalSegmentsTarget;
+ int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;
+
+ TableRebalanceProgressStats.RebalanceProgressStats progressStats =
+ new TableRebalanceProgressStats.RebalanceProgressStats();
+ switch (trigger) {
+ case START_TRIGGER:
+ case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+ // These are initialization steps for global / step progress stats
+ progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded;
+ progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+ progressStats._totalRemainingSegmentsToBeAdded =
totalSegmentsToBeAdded;
+ progressStats._totalRemainingSegmentsToBeDeleted =
totalSegmentsToBeDeleted;
+ progressStats._totalCarryOverSegmentsToBeAdded = 0;
+ progressStats._totalCarryOverSegmentsToBeDeleted = 0;
+ progressStats._totalRemainingSegmentsToConverge =
segmentsUnchangedYetNotConverged;
+ progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance =
totalNewSegmentsNotMonitored;
+ progressStats._percentageRemainingSegmentsToBeAdded =
totalSegmentsToBeAdded == 0 ? 0.0 : 100.0;
+ progressStats._percentageRemainingSegmentsToBeDeleted =
totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0;
+ progressStats._estimatedTimeToCompleteAddsInSeconds =
totalSegmentsToBeAdded == 0 ? 0.0 : -1.0;
+ progressStats._estimatedTimeToCompleteDeletesInSeconds =
totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0;
+ progressStats._averageSegmentSizeInBytes =
rebalanceContext.getEstimatedAverageSegmentSizeInBytes();
+ progressStats._totalEstimatedDataToBeMovedInBytes =
+
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0,
+ rebalanceContext.getEstimatedAverageSegmentSizeInBytes(),
totalSegmentsToBeAdded);
+ progressStats._startTimeMs = trigger ==
Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER
+ ? System.currentTimeMillis() :
rebalanceProgressStats.getStartTimeMs();
+ break;
+ case IDEAL_STATE_CHANGE_TRIGGER:
+ case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
Review Comment:
done
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
(totalSegments == 0) ? 0 : ((double)
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
return rebalanceStats;
}
+
+ /**
+ * Calculates the progress stats for the given step or for the overall based
on the trigger type
+ * @return the calculated step or progress stats
+ */
+ @VisibleForTesting
+ static TableRebalanceProgressStats.RebalanceProgressStats
calculateUpdatedProgressStats(
+ Map<String, Map<String, String>> targetAssignment, Map<String,
Map<String, String>> currentAssignment,
+ RebalanceContext rebalanceContext, Trigger trigger,
TableRebalanceProgressStats rebalanceProgressStats) {
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new
HashMap<>();
+ Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+ Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+ int totalNewSegmentsNotMonitored = 0;
+ int totalSegmentsTarget = 0;
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ String segmentName = entrySet.getKey();
+ if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+ newSegmentsNotExistingBefore.add(segmentName);
+ }
+ for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+ String instanceName = entry.getKey();
+ String instanceState = entry.getValue();
+ if (segmentsToMonitor != null &&
!segmentsToMonitor.contains(segmentName)) {
+ if (newSegmentsNotExistingBefore.contains(segmentName)) {
+ // Don't track newly added segments unless they're on the monitor
list
+ totalNewSegmentsNotMonitored++;
+ newSegmentsNotExistingBefore.remove(segmentName);
+ }
+ continue;
+ }
+ if
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
{
+ // Skip tracking segments that are in OFFLINE state in the target
assignment
+ targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k
-> new HashSet<>()).add(segmentName);
+ continue;
+ }
+ totalSegmentsTarget += 1;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]