jtuglu1 commented on code in PR #18954:
URL: https://github.com/apache/druid/pull/18954#discussion_r2735278572


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1762,6 +1771,8 @@ public void runInternal()
 
       checkTaskDuration();
 
+      maybeApplyPendingScaleRollover();
+
       checkPendingCompletionTasks();
 

Review Comment:
   Should this be called after `checkPendingCompletionTasks()`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
           }
         });
 
+    // Phase 1 of scale-during-rollover: detect and set up.
+    // The taskCount change and re-allocation happen in Phase 2 after all 
tasks have stopped.
+    // We respect maxAllowedStops to avoid worker capacity exhaustion - 
rollover may take multiple cycles.
+    if (!futures.isEmpty() && taskAutoScaler != null && 
pendingRolloverTaskCount == null) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
+        log.info(
+            "Autoscaler recommends scaling to [%d] tasks during rollover for 
supervisor[%s]. "
+            + "Setting up pending rollover - will apply after all tasks stop.",
+            rolloverTaskCount, supervisorId
+        );
+
+        // Stop remaining active groups while respecting maxAllowedStops to 
avoid
+        // worker capacity exhaustion. Publishing tasks continue consuming 
worker slots,
+        // so stopping all at once could leave no capacity for new tasks.
+        int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                        
.mapToInt(List::size).sum();
+        int availableStops = ioConfig.getMaxAllowedStops() - 
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+        int stoppedForRollover = 0;
+        for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
+          Integer groupId = entry.getKey();
+          if (!futureGroupIds.contains(groupId)) {
+            if (stoppedForRollover >= availableStops) {
+              log.info(
+                  "Deferring stop of taskGroup[%d] to next cycle - 
maxAllowedStops[%d] reached. "
+                  + "Publishing tasks: [%d], stopped this cycle: [%d].",
+                  groupId, ioConfig.getMaxAllowedStops(), 
numPendingCompletionTaskGroups, numStoppedTasks.get()
+              );
+              continue;
+            }
+            log.info(
+                "Stopping taskGroup[%d] for autoscaler rollover to [%d] 
tasks.",
+                groupId, rolloverTaskCount

Review Comment:
   maybe log both `stoppedForRollover` and `availableStops`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
           }
         });
 
+    // Phase 1 of scale-during-rollover: detect and set up.
+    // The taskCount change and re-allocation happen in Phase 2 after all 
tasks have stopped.
+    // We respect maxAllowedStops to avoid worker capacity exhaustion - 
rollover may take multiple cycles.
+    if (!futures.isEmpty() && taskAutoScaler != null && 
pendingRolloverTaskCount == null) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
+        log.info(
+            "Autoscaler recommends scaling to [%d] tasks during rollover for 
supervisor[%s]. "
+            + "Setting up pending rollover - will apply after all tasks stop.",
+            rolloverTaskCount, supervisorId
+        );
+
+        // Stop remaining active groups while respecting maxAllowedStops to 
avoid
+        // worker capacity exhaustion. Publishing tasks continue consuming 
worker slots,
+        // so stopping all at once could leave no capacity for new tasks.
+        int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                        
.mapToInt(List::size).sum();
+        int availableStops = ioConfig.getMaxAllowedStops() - 
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+        int stoppedForRollover = 0;
+        for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
+          Integer groupId = entry.getKey();
+          if (!futureGroupIds.contains(groupId)) {
+            if (stoppedForRollover >= availableStops) {
+              log.info(
+                  "Deferring stop of taskGroup[%d] to next cycle - 
maxAllowedStops[%d] reached. "
+                  + "Publishing tasks: [%d], stopped this cycle: [%d].",
+                  groupId, ioConfig.getMaxAllowedStops(), 
numPendingCompletionTaskGroups, numStoppedTasks.get()
+              );
+              continue;
+            }
+            log.info(
+                "Stopping taskGroup[%d] for autoscaler rollover to [%d] 
tasks.",
+                groupId, rolloverTaskCount
+            );
+            futureGroupIds.add(groupId);
+            futures.add(checkpointTaskGroup(entry.getValue(), true));
+            stoppedForRollover++;
+          }
+        }
+
+        // Set the pending rollover flag - actual change applied in Phase 2
+        // when ALL actively reading task groups have stopped
+        pendingRolloverTaskCount = rolloverTaskCount;

Review Comment:
   Another question here is whether we want to invalidate these results if 
things have drastically changed during a roll-over. For supervisors running 
100s of tasks (think 300-400 tasks) a roll-over period might take 10-15mins 
depending on how aggressive you are (also depends on how many other concurrent 
supervisors you have running). Naturally, lag will spike much higher on 
roll-over so perhaps it's worth considering some sort of sanity check before 
committing this result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to