echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        final Duration timeSinceLastRescale = timeSinceLastRescale();
+        rescaleScheduled = false;
+        final boolean shouldForceRescale =
+                (scalingIntervalMax != null)
+                        && (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+                        && (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+        if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+            if (shouldForceRescale) {
+                getLogger()
+                        .info(
+                                "Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+                                timeSinceLastRescale,
+                                
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                                scalingIntervalMax);
+            } else {
+                getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+            }
+            lastRescale = Instant.now();
             context.goToRestarting(
                     getExecutionGraph(),

Review Comment:
   Thanks Chesnay for your views (again) ! 
   > With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   This is what I proposed indeed, but you're right that
   
   > it might be side-stepping the intentions behind the min increase option 
and timeout
   
   if for example after the 24h resources start to arrive 1 slot every 5 min, 
with this scenario we will restart every 5 min (as the timeout is exceeded) 
which we want to avoid. So it is better to schedule a timeout when resources 
arrive and `added ressource < min increase`. When the timeout fires we do one 
single rescale that takes all the added slots in one shot.
   I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to