zentol commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1325575378
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { - if (context.shouldRescale(getExecutionGraph())) { - getLogger().info("Can change the parallelism of job. Restarting job."); + final Duration timeSinceLastRescale = timeSinceLastRescale(); + rescaleScheduled = false; + final boolean shouldForceRescale = + (scalingIntervalMax != null) + && (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) + && (lastRescale != Instant.EPOCH); // initial rescale is not forced + if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { + if (shouldForceRescale) { + getLogger() + .info( + "Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", + timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax); + } else { + getLogger().info("Can change the parallelism of the job. Restarting the job."); + } + lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: I don't really see a difference between the 2 options. In both options you had a running job, got resources that do not exceed min-parallellism-increase, and trigger a rescale later after scalingIntervalMax/pipelineRuntimeRescaleMin (or however it is called). It just seems like option 2 applies scalingIntervalMax twice; once as a condition to schedule tryRescaled, and another time by using it as the delay. But I assume that that's a mistake in the description, and the difference is meant to be _when_ you start the timeout. Option 1 does it when a resource is acquired, the 2nd option after a rescale. So, for a concrete example: We have a job running at p=1 for 24h straight, have a min increase of 1, and a timeout of 1h. Now we get another slot at the 24h mark. With option 1 we rescale after an hour. With option 2 we rescale right away because the timeout already elapsed within the previous 24 hours. Option 2 is pretty clever, but I think it might be side-stepping the intentions behind the min increase option and timeout. The min increase option was added to prevent frequent rescalings when resources trickle in slowly. A slot arrives, you rescale. Another one arrives 5 minutes later, you rescale again, and so on. The timeout was meant for the case where no further resources arrive; you're holding on to this 1 slot _forever_, but using it would pay off over time. If you rescale immediately, you will end up rescaling twice in the case where multiples slots do arrive slowly. With that in mind I'd still prefer option 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org