zentol commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325575378


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        final Duration timeSinceLastRescale = timeSinceLastRescale();
+        rescaleScheduled = false;
+        final boolean shouldForceRescale =
+                (scalingIntervalMax != null)
+                        && (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+                        && (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+        if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+            if (shouldForceRescale) {
+                getLogger()
+                        .info(
+                                "Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+                                timeSinceLastRescale,
+                                
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                                scalingIntervalMax);
+            } else {
+                getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+            }
+            lastRescale = Instant.now();
             context.goToRestarting(
                     getExecutionGraph(),

Review Comment:
   I don't really see a difference between the 2 options. In both options you 
had a running job, got resources that do not exceed min-parallellism-increase, 
and trigger a rescale later after scalingIntervalMax/pipelineRuntimeRescaleMin 
(or however it is called).
   
   It just seems like option 2 applies scalingIntervalMax twice; once as a 
condition to schedule tryRescaled, and another time by using it as the delay.
   
   But I assume that that's a mistake in the description, and the difference is 
meant to be _when_ you start the timeout.
   Option 1 does it when a resource is acquired, the 2nd option after a rescale.
   
   So, for a concrete example: We have a job running at p=1 for 24h straight, 
have a min increase of 1, and a timeout of 1h.
   Now we get another slot at the 24h mark.
   
   With option 1 we rescale after an hour.
   With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   Option 2 is pretty clever, but I think it might be side-stepping the 
intentions behind the min increase option and timeout.
   
   The min increase option was added to prevent frequent rescalings when 
resources trickle in slowly.
   A slot arrives, you rescale. Another one arrives 5 minutes later, you 
rescale again, and so on.
   The timeout was meant for the case where no further resources arrive; you're 
holding on to this 1 slot _forever_, but using it would pay off over time.
   
   If you rescale immediately, you will end up rescaling twice in the case 
where multiples slots do arrive slowly.
   
   With that in mind I'd still prefer option 1.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to