jtuglu1 commented on code in PR #18954:
URL: https://github.com/apache/druid/pull/18954#discussion_r2735278572
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1762,6 +1771,8 @@ public void runInternal()
checkTaskDuration();
+ maybeApplyPendingScaleRollover();
+
checkPendingCompletionTasks();
Review Comment:
Should this be called after `checkPendingCompletionTasks()`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
}
});
+ // Phase 1 of scale-during-rollover: detect and set up.
+ // The taskCount change and re-allocation happen in Phase 2 after all
tasks have stopped.
+ // We respect maxAllowedStops to avoid worker capacity exhaustion -
rollover may take multiple cycles.
+ if (!futures.isEmpty() && taskAutoScaler != null &&
pendingRolloverTaskCount == null) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
+ log.info(
+ "Autoscaler recommends scaling to [%d] tasks during rollover for
supervisor[%s]. "
+ + "Setting up pending rollover - will apply after all tasks stop.",
+ rolloverTaskCount, supervisorId
+ );
+
+ // Stop remaining active groups while respecting maxAllowedStops to
avoid
+ // worker capacity exhaustion. Publishing tasks continue consuming
worker slots,
+ // so stopping all at once could leave no capacity for new tasks.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ int availableStops = ioConfig.getMaxAllowedStops() -
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+ int stoppedForRollover = 0;
+ for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
+ Integer groupId = entry.getKey();
+ if (!futureGroupIds.contains(groupId)) {
+ if (stoppedForRollover >= availableStops) {
+ log.info(
+ "Deferring stop of taskGroup[%d] to next cycle -
maxAllowedStops[%d] reached. "
+ + "Publishing tasks: [%d], stopped this cycle: [%d].",
+ groupId, ioConfig.getMaxAllowedStops(),
numPendingCompletionTaskGroups, numStoppedTasks.get()
+ );
+ continue;
+ }
+ log.info(
+ "Stopping taskGroup[%d] for autoscaler rollover to [%d]
tasks.",
+ groupId, rolloverTaskCount
Review Comment:
maybe log both `stoppedForRollover` and `availableStops`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3392,6 +3403,53 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
}
});
+ // Phase 1 of scale-during-rollover: detect and set up.
+ // The taskCount change and re-allocation happen in Phase 2 after all
tasks have stopped.
+ // We respect maxAllowedStops to avoid worker capacity exhaustion -
rollover may take multiple cycles.
+ if (!futures.isEmpty() && taskAutoScaler != null &&
pendingRolloverTaskCount == null) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
+ log.info(
+ "Autoscaler recommends scaling to [%d] tasks during rollover for
supervisor[%s]. "
+ + "Setting up pending rollover - will apply after all tasks stop.",
+ rolloverTaskCount, supervisorId
+ );
+
+ // Stop remaining active groups while respecting maxAllowedStops to
avoid
+ // worker capacity exhaustion. Publishing tasks continue consuming
worker slots,
+ // so stopping all at once could leave no capacity for new tasks.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ int availableStops = ioConfig.getMaxAllowedStops() -
numPendingCompletionTaskGroups - numStoppedTasks.get();
+
+ int stoppedForRollover = 0;
+ for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
+ Integer groupId = entry.getKey();
+ if (!futureGroupIds.contains(groupId)) {
+ if (stoppedForRollover >= availableStops) {
+ log.info(
+ "Deferring stop of taskGroup[%d] to next cycle -
maxAllowedStops[%d] reached. "
+ + "Publishing tasks: [%d], stopped this cycle: [%d].",
+ groupId, ioConfig.getMaxAllowedStops(),
numPendingCompletionTaskGroups, numStoppedTasks.get()
+ );
+ continue;
+ }
+ log.info(
+ "Stopping taskGroup[%d] for autoscaler rollover to [%d]
tasks.",
+ groupId, rolloverTaskCount
+ );
+ futureGroupIds.add(groupId);
+ futures.add(checkpointTaskGroup(entry.getValue(), true));
+ stoppedForRollover++;
+ }
+ }
+
+ // Set the pending rollover flag - actual change applied in Phase 2
+ // when ALL actively reading task groups have stopped
+ pendingRolloverTaskCount = rolloverTaskCount;
Review Comment:
Another question here is whether we want to invalidate these results if
things have drastically changed during a roll-over. For supervisors running
100s of tasks (think 300-400 tasks) a roll-over period might take 10-15mins
depending on how aggressive you are (also depends on how many other concurrent
supervisors you have running). Naturally, lag will spike much higher on
roll-over so perhaps it's worth considering some sort of sanity check before
committing this result?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]