Fly-Style commented on code in PR #18954:
URL: https://github.com/apache/druid/pull/18954#discussion_r2742531835
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
}
});
+ // Phase 1 of scale-during-rollover: detect and set up.
+ // The taskCount change and re-allocation happen in Phase 2 after all
tasks have stopped.
+ // We respect maxAllowedStops to avoid worker capacity exhaustion -
rollover may take multiple cycles.
+ if (!futures.isEmpty() && taskAutoScaler != null &&
pendingRolloverTaskCount == null) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
+ log.info(
+ "Autoscaler recommends scaling to [%d] tasks during rollover for
supervisor[%s]. "
+ + "Setting up pending rollover - will apply after all tasks stop.",
+ rolloverTaskCount, supervisorId
+ );
+
+ // Stop remaining active groups while respecting maxAllowedStops to
avoid
+ // worker capacity exhaustion. Publishing tasks continue consuming
worker slots,
+ // so stopping all at once could leave no capacity for new tasks.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ int availableStops = ioConfig.getMaxAllowedStops() -
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+ int stoppedForRollover = 0;
+ for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
+ Integer groupId = entry.getKey();
+ if (!futureGroupIds.contains(groupId)) {
+ if (stoppedForRollover >= availableStops) {
+ log.info(
+ "Deferring stop of taskGroup[%d] to next cycle -
maxAllowedStops[%d] reached. "
+ + "Publishing tasks: [%d], stopped this cycle: [%d].",
+ groupId, ioConfig.getMaxAllowedStops(),
numPendingCompletionTaskGroups, numStoppedTasks.get()
+ );
+ continue;
+ }
+ log.info(
+ "Stopping taskGroup[%d] for autoscaler rollover to [%d]
tasks.",
+ groupId, rolloverTaskCount
+ );
+ futureGroupIds.add(groupId);
+ futures.add(checkpointTaskGroup(entry.getValue(), true));
+ stoppedForRollover++;
+ }
+ }
+
+ // Set the pending rollover flag - actual change applied in Phase 2
+ // when ALL actively reading task groups have stopped
+ pendingRolloverTaskCount = rolloverTaskCount;
Review Comment:
Actually, I thought and make scale down during rollover as configurable
option here: https://github.com/apache/druid/pull/18958.
Definitely, there are cases when we may not want to use this option and
scale down during task runtime, as usual.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]