Fly-Style commented on code in PR #18954:
URL: https://github.com/apache/druid/pull/18954#discussion_r2742533631


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
           }
         });
 
+    // Phase 1 of scale-during-rollover: detect and set up.
+    // The taskCount change and re-allocation happen in Phase 2 after all 
tasks have stopped.
+    // We respect maxAllowedStops to avoid worker capacity exhaustion - 
rollover may take multiple cycles.
+    if (!futures.isEmpty() && taskAutoScaler != null && 
pendingRolloverTaskCount == null) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
+        log.info(
+            "Autoscaler recommends scaling to [%d] tasks during rollover for 
supervisor[%s]. "
+            + "Setting up pending rollover - will apply after all tasks stop.",
+            rolloverTaskCount, supervisorId
+        );
+
+        // Stop remaining active groups while respecting maxAllowedStops to 
avoid
+        // worker capacity exhaustion. Publishing tasks continue consuming 
worker slots,
+        // so stopping all at once could leave no capacity for new tasks.
+        int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                        
.mapToInt(List::size).sum();
+        int availableStops = ioConfig.getMaxAllowedStops() - 
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+        int stoppedForRollover = 0;
+        for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
+          Integer groupId = entry.getKey();
+          if (!futureGroupIds.contains(groupId)) {
+            if (stoppedForRollover >= availableStops) {
+              log.info(
+                  "Deferring stop of taskGroup[%d] to next cycle - 
maxAllowedStops[%d] reached. "
+                  + "Publishing tasks: [%d], stopped this cycle: [%d].",
+                  groupId, ioConfig.getMaxAllowedStops(), 
numPendingCompletionTaskGroups, numStoppedTasks.get()
+              );
+              continue;
+            }
+            log.info(
+                "Stopping taskGroup[%d] for autoscaler rollover to [%d] 
tasks.",
+                groupId, rolloverTaskCount

Review Comment:
   Done here: 
https://github.com/apache/druid/pull/18954/commits/171344a7fc1fe3dd272a29e3004eddbd09ba2edd,
 AFAIR.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
           }
         });
 
+    // Phase 1 of scale-during-rollover: detect and set up.
+    // The taskCount change and re-allocation happen in Phase 2 after all 
tasks have stopped.
+    // We respect maxAllowedStops to avoid worker capacity exhaustion - 
rollover may take multiple cycles.
+    if (!futures.isEmpty() && taskAutoScaler != null && 
pendingRolloverTaskCount == null) {
+      int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
+        log.info(
+            "Autoscaler recommends scaling to [%d] tasks during rollover for 
supervisor[%s]. "
+            + "Setting up pending rollover - will apply after all tasks stop.",
+            rolloverTaskCount, supervisorId
+        );
+
+        // Stop remaining active groups while respecting maxAllowedStops to 
avoid
+        // worker capacity exhaustion. Publishing tasks continue consuming 
worker slots,
+        // so stopping all at once could leave no capacity for new tasks.
+        int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                        
.mapToInt(List::size).sum();
+        int availableStops = ioConfig.getMaxAllowedStops() - 
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+        int stoppedForRollover = 0;
+        for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
+          Integer groupId = entry.getKey();
+          if (!futureGroupIds.contains(groupId)) {
+            if (stoppedForRollover >= availableStops) {
+              log.info(
+                  "Deferring stop of taskGroup[%d] to next cycle - 
maxAllowedStops[%d] reached. "
+                  + "Publishing tasks: [%d], stopped this cycle: [%d].",
+                  groupId, ioConfig.getMaxAllowedStops(), 
numPendingCompletionTaskGroups, numStoppedTasks.get()
+              );
+              continue;
+            }
+            log.info(
+                "Stopping taskGroup[%d] for autoscaler rollover to [%d] 
tasks.",
+                groupId, rolloverTaskCount

Review Comment:
   Done here: 
https://github.com/apache/druid/pull/18954/commits/171344a7fc1fe3dd272a29e3004eddbd09ba2edd,
 IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to