echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        final Duration timeSinceLastRescale = timeSinceLastRescale();
+        rescaleScheduled = false;
+        final boolean shouldForceRescale =
+                (scalingIntervalMax != null)
+                        && (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+                        && (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+        if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+            if (shouldForceRescale) {
+                getLogger()
+                        .info(
+                                "Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+                                timeSinceLastRescale,
+                                
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                                scalingIntervalMax);
+            } else {
+                getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+            }
+            lastRescale = Instant.now();
             context.goToRestarting(
                     getExecutionGraph(),

Review Comment:
   Here is the summary of the offline discussion @1996fanrui and I had about 
`scalingIntervalMax` and forcing rescale (current code: when `added resources < 
min-parallelism-increase` we force a rescale if `timeSinceLastRescale > 
scalingIntervalMax`):
   1. aim: the longer the pipeline runs, the more the (small) resource gain is 
worth the restarting time.
   2. corner case: a resource `< min-parallelism-increase` arrives when 
`timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a 
long time (typical case 1) => with the current code, we don't force a rescale 
in that case whereas the added resource would be worth the restarting time.
   => I proposed solution 1: changing the definition of  `scalingIntervalMax` 
to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a 
rescale even if `added resources < min-parallelism-increase`
   => @1996fanrui  proposed solution 2: if `added resources < 
min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule 
a tryRescale after `scalingIntervalMax` : force a rescale if there is indeed a 
change in the resource graph at that time in case the last TM crashed 
   
   @zentol you were the one who proposed the addition of `scalingIntervalMax` 
in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to