echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { - if (context.shouldRescale(getExecutionGraph())) { - getLogger().info("Can change the parallelism of job. Restarting job."); + final Duration timeSinceLastRescale = timeSinceLastRescale(); + rescaleScheduled = false; + final boolean shouldForceRescale = + (scalingIntervalMax != null) + && (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) + && (lastRescale != Instant.EPOCH); // initial rescale is not forced + if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { + if (shouldForceRescale) { + getLogger() + .info( + "Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", + timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax); + } else { + getLogger().info("Can change the parallelism of the job. Restarting the job."); + } + lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: Here is the summary of the offline discussion @1996fanrui and I had about `scalingIntervalMax` and forcing rescale (current code: when `added resources < min-parallelism-increase` we force a rescale if `timeSinceLastRescale > scalingIntervalMax`): 1. aim: the longer the pipeline runs, the more the (small) resource gain is worth the restarting time. 2. corner case: a resource `< min-parallelism-increase` arrives when `timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a long time (typical case 1) => with the current code, we don't force a rescale in that case whereas the added resource would be worth the restarting time. => I proposed solution 1: changing the definition of `scalingIntervalMax` to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a rescale even if `added resources < min-parallelism-increase` => @1996fanrui proposed solution 2: if `added resources < min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule a tryRescale after `scalingIntervalMax` : force a rescale if there is indeed a change in the resource graph at that time in case the last TM crashed @zentol you were the one who proposed the addition of `scalingIntervalMax` in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org