[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-18 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1328774059


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1326121636


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > This is all about stability; "I could have rescaled and it wouldn't have 
been a problem.". But you can only conclude that after having out that slot for 
some period of time; otherwise you're just guessing.
   
   Exactly. So, when slot arrives scheduling a rescale-check after a timeout 
seems a good trade-off to ensure that the slot is still there after a period of 
time. When the timeout fires:
   - If the slot is still here we should force-rescale (better late than never 
case)
   - If the slot has vanished we should determine that the target parallelism 
is the same as current and not rescale (avoid unnecessary rescale case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325907644


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -488,6 +488,23 @@ public enum SchedulerType {
 
code(SchedulerExecutionMode.REACTIVE.name()))
 .build());
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN =
+key("jobmanager.adaptive-scheduler.scaling-interval.min")
+.durationType()
+.defaultValue(Duration.ofSeconds(30))
+// rescaling and let the user increase the value for high 
workloads
+.withDescription(
+"Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325871122


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Thanks Chesnay for your views (again) ! 
   > With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   This is what I proposed indeed, but you're right that
   
   > it might be side-stepping the intentions behind the min increase option 
and timeout
   
   if for example after the 24h resources start to arrive 1 slot every 5 min, 
with this scenario we will restart every 5 min (as the timeout is exceeded 
min-increase check is overridden) which is what we want to avoid. So it is 
better to schedule a timeout when resources arrive and `added ressource < min 
increase`. When the timeout fires, we do one single rescale that takes all the 
added slots in one shot.
   I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Thanks Chesnay for your views (again) ! 
   > With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   This is what I proposed indeed, but you're right that
   
   > it might be side-stepping the intentions behind the min increase option 
and timeout
   
   if for example after the 24h resources start to arrive 1 slot every 5 min, 
with this scenario we will restart every 5 min (as the timeout is exceeded) 
which we want to avoid. So it is better to schedule a timeout when resources 
arrive and `added ressource < min increase`. When the timeout fires we do one 
single rescale that takes all the added slots in one shot.
   I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321393999


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
 this.context = context;
 Preconditions.checkState(
 executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
+this.scalingIntervalMin = scalingIntervalMin;
+this.scalingIntervalMax = scalingIntervalMax;
+Preconditions.checkState(
+!scalingIntervalMin.isNegative(),
+"{} must be positive integer or 0",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+if (scalingIntervalMax != null) {
+Preconditions.checkState(
+scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+"{}({}) must be greater than {}({})",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax,
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+scalingIntervalMin);
+}
 
 deploy();
 
 // check if new resources have come available in the meantime
 context.runIfState(this, this::maybeRescale, Duration.ZERO);

Review Comment:
   not necessary a new rescale it could be a transition from Created to 
Executing state (FLIP-160). In that case we want to eagerly adapt to resources 
that could have arrived while we were starting and not wait for the cooldown 
period to end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-13 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Here is the summary of the offline discussion @1996fanrui and I had about 
`scalingIntervalMax` and forcing rescale (current code: when `added resources < 
min-parallelism-increase` we force a rescale if `timeSinceLastRescale > 
scalingIntervalMax`):
   1. aim: the longer the pipeline runs, the more the (small) resource gain is 
worth the restarting time.
   2. corner case: a resource `< min-parallelism-increase` arrives when 
`timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a 
long time (typical case 1) => with the current code, we don't force a rescale 
in that case whereas the added resource would be worth the restarting time.
   => I proposed solution 1: changing the definition of  `scalingIntervalMax` 
to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a 
rescale even if `added resources < min-parallelism-increase`
   => @1996fanrui  proposed solution 2: if `added resources < 
min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule 
a tryRescale after `scalingIntervalMax` : force a rescale if there is indeed a 
change in the resource graph at that time in case the last TM crashed 
   
   @zentol you were the one who proposed the addition of `scalingIntervalMax` 
in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-13 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Here is the summary of the offline discussion @1996fanrui and I had about 
`scalingIntervalMax` and forcing rescale (current code: when `added resources < 
min-parallelism-increase` we force a rescale if `timeSinceLastRescale > 
scalingIntervalMax`):
   1. aim: the longer the pipeline runs, the more the (small) resource gain is 
worth the restarting time.
   2. corner case: a resource `< min-parallelism-increase` arrives when 
`timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a 
long time (typical case 1) => with the current code, we don't force a rescale 
in that case whereas the added resource would be worth the restarting time.
   => I proposed solution 1: changing the definition of  `scalingIntervalMax` 
to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a 
rescale even if `added resources < min-parallelism-increase`
   => @1996fanrui  proposed solution 2: if `added resources < 
min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule 
a tryRescale (force a rescale if there is indeed a change in the resource graph 
at that time in case the last TM crashed) after `scalingIntervalMax`
   
   @zentol you were the one who proposed the addition of `scalingIntervalMax` 
in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1322947925


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > I think the semantic of scalingIntervalMax should be fixed to the user 
side and should not be related to the calling order in the code or the order in 
which resources arrive.
   
   I don't understand what you mean by "fixed to the user side".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1322784279


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > > > Do you mean we force trigger a rescale when new resources comes and 
the current time > lastRescaleTime + scalingIntervalMax?
   > > 
   > > 
   > > yes, that is what is what the community agreed on in the FLIP.
   > 
   > I think this strategy just solve one case: the last resource comes after 
scalingIntervalMax. The job can rescale directly.
   > 
   
   It was made after this observation: the longer the pipeline runs, the more 
the (small) resource gain is worth the restarting time. 
   
   > However, it cannot solve the case: the last resource comes before 
scalingIntervalMax. Assuming the scalingIntervalMax is 5 miuntes, the 
`jobmanager.adaptive-scheduler.min-parallelism-increase` is 2, and the expected 
paralleslim is 100.
   > 
   > * The job has 99 TMs at 09:00:00, it run with parallelism 99.
   > * At 09:03:00, the last TM comes.
   > * The `Executing` will ignore the rescale due to the parallelism diff is 
1, it less than `min-parallelism-increase` and `current time < lastRescaleTime 
+ scalingIntervalMax`.
   > 
   > And then the job doesn't need the new resource and the expected 
parallelism isn't change, so the job won't rescale anymore.
   
   It is not linked to the scalingIntervalMax, before this PR in such a case 
there was no rescale either because simply the minimum resource addition was 
not met.  It is by design of the min-parallelism-increase that we don't change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1322784279


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > > > Do you mean we force trigger a rescale when new resources comes and 
the current time > lastRescaleTime + scalingIntervalMax?
   > > 
   > > 
   > > yes, that is what is what the community agreed on in the FLIP.
   > 
   > I think this strategy just solve one case: the last resource comes after 
scalingIntervalMax. The job can rescale directly.
   > 
   
   It was made after this observation, the longer the pipeline runs, the more 
the resource gain is worth the restarting time. 
   
   > However, it cannot solve the case: the last resource comes before 
scalingIntervalMax. Assuming the scalingIntervalMax is 5 miuntes, the 
`jobmanager.adaptive-scheduler.min-parallelism-increase` is 2, and the expected 
paralleslim is 100.
   > 
   > * The job has 99 TMs at 09:00:00, it run with parallelism 99.
   > * At 09:03:00, the last TM comes.
   > * The `Executing` will ignore the rescale due to the parallelism diff is 
1, it less than `min-parallelism-increase` and `current time < lastRescaleTime 
+ scalingIntervalMax`.
   > 
   > And then the job doesn't need the new resource and the expected 
parallelism isn't change, so the job won't rescale anymore.
   
   It is not linked to the scalingIntervalMax, before this PR in such a case 
there was no rescale either because simply the minimum resource addition was 
not met.  It is by design of the min-parallelism-increase that we don't change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1322738246


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > > > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   > > 
   > > 
   > > This logic is already in maybeRescale() method
   > 
   > Sorry, I have a typo here. I mean the logic related to scalingIntervalMax 
should be moved out of the maybeRescale.
   > 
   > `onNewResourceRequirements` also call the 
`rescaleWhenCooldownPeriodIsOver` when user changed the parallelism, 
maybeRescale shouldn't force trigger a rescale in this case even if the 
`current time > lastRescaleTime + scalingIntervalMax`.
   > 
   > Because after requirement is adjusted, resources cannot be ready 
immediately. The rescale is meaningless at this time even if the `current time 
> lastRescaleTime + scalingIntervalMax`.
   
   Agree, I think the same is true for maybeRescale() called in the constructor 
to check if new resources have arrived during the starting process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Do you mean we force trigger a rescale when new resources comes and the 
current time > lastRescaleTime + scalingIntervalMax?
   
   yes, that is what the community agreed on in the FLIP.
   
   > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   
   This logic is already in maybeRescale() method
   
   > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, 
when user changed the parallelism; maybeRescale will force trigger a rescale as 
well. right?
   
   yes `onNewResourceRequirements` is ultimately called by a rest endpoint to 
react to user action on the web ui so it will call maybeRescale in the end but 
that would not always mean a rescale: if `timeSinceLastRescale < 
scalingIntervalMax` and minimal resources are not met then there will be no 
rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Do you mean we force trigger a rescale when new resources comes and the 
current time > lastRescaleTime + scalingIntervalMax?
   
   yes, that is what the community agreed on in the FLIP.
   
   > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   
   This logic is already in maybeRescale() method
   
   > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, 
when user changed the parallelism; maybeRescale will force trigger a rescale as 
well. right?
   
   yes `onNewResourceRequirements` is ultimately called by a rest endpoint to 
react to user action on the web ui so it will call maybeRescale in the end but 
that would not always mean a rescale: if timeSinceLastRescale < 
scalingIntervalMax and minimal resources are not met then there will be no 
rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-12 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Do you mean we force trigger a rescale when new resources comes and the 
current time > lastRescaleTime + scalingIntervalMax?
   
   yes, that is what the community agreed on in the FLIP.
   
   > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   
   This logic is already in maybeRescale() method
   
   > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, 
when user changed the parallelism; maybeRescale will force trigger a rescale as 
well. right?
   
   yes `onNewResourceRequirements` is ultimately called by a rest endpoint to 
react to use action on the web ui so it will call maybeRescale in the end but 
that would not always mean a rescale: if timeSinceLastRescale < 
scalingIntervalMax and minimal resources are not met then there will be no 
rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Do you mean we force trigger a rescale when new resources comes and the 
current time > lastRescaleTime + scalingIntervalMax?
   
   yes, that is what is what the community agreed on in the FLIP.
   
   > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   
   This logic is already in maybeRescale() method
   
   > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, 
when user changed the parallelism; maybeRescale will force trigger a rescale as 
well. right?
   
   yes `onNewResourceRequirements` is ultimately called by a rest endpoint to 
react to use action on the web ui so it will call maybeRescale in the end but 
that would not always mean a rescale: if timeSinceLastRescale < 
scalingIntervalMax and minimal resources are not met then there will be no 
rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Do you mean we force trigger a rescale when new resources comes and the 
current time > lastRescaleTime + scalingIntervalMax?
   
   yes, that is what is what the community agreed on in the FLIP.
   
   > If so, I think the logic related to scalingIntervalMax should be moved 
into maybeRescale.
   
   This logic is already in maybeRescale() method
   
   > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, 
when user changed the parallelism; maybeRescale will force trigger a rescale as 
well. right?
   
   yes onNewResourceRequirements is ultimately called by a rest endpoint to 
react to use action on the web ui so it will call maybeRescale in the end but 
that would not always mean a rescale: if timeSinceLastRescale < 
scalingIntervalMax and minimal resources are not met then there will be no 
rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321325324


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   in the FLIP we said that the triggering event of the rescale is resource 
addition (`Executing#onNewResourcesAvailable`) (downscale is triggered by 
failures) so there is no need to schedule: when such an event happens 
`maybeRescale` is always called (either directly or scheduled) and when it is 
called, the rescale is done if `timeSinceLastRescale > scalingIntervalMax` no 
matter the min resources.  
   
   There is already a test: 
`ExecutingTest#testNotifyNewResourcesAvailableWithNoResourcesAndForce`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321393999


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
 this.context = context;
 Preconditions.checkState(
 executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
+this.scalingIntervalMin = scalingIntervalMin;
+this.scalingIntervalMax = scalingIntervalMax;
+Preconditions.checkState(
+!scalingIntervalMin.isNegative(),
+"{} must be positive integer or 0",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+if (scalingIntervalMax != null) {
+Preconditions.checkState(
+scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+"{}({}) must be greater than {}({})",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax,
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+scalingIntervalMin);
+}
 
 deploy();
 
 // check if new resources have come available in the meantime
 context.runIfState(this, this::maybeRescale, Duration.ZERO);

Review Comment:
   not necessary a new rescale it could be a transition from Created to 
Executing state (FLIP-160). In that case we want to eagerly adapt to resources 
that could have arrived when while we were starting and not wait for the 
cooldown period to end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321390908


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;

Review Comment:
   recording lastRescale as Instant.now() in the Executing constructor could 
work but it means that even a regular transition from Created to Executing 
state (see 
[FLIP-160](https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler))
 would be considered a rescale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321387884


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;

Review Comment:
   I agree, there is indeed a bug in Executing being a new object with each 
rescale when the transition to Restaring and then to Executing state is done. I 
thought the state of the Executing object was kept between the restarts (with 
checkpointing) but it is not, a new one is created in the transitionToState(). 
This leeds to lastRescale being always be EPOCH. Thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321325324


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   in the FLIP we said that the triggering event of the rescale is resource 
addition (`Executing#onNewResourcesAvailable`) (downscale is triggered by 
failures) so there is no need to schedule: when such an event happens 
`maybeRescale` is always called (either directly or scheduled) and when it is 
called, the rescale is done if `timeSinceLastRescale > scalingIntervalMax` no 
matter the min resources.  
   
   There is also a test: 
`ExecutingTest#testNotifyNewResourcesAvailableWithNoResourcesAndForce`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321300535


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -488,6 +488,23 @@ public enum SchedulerType {
 
code(SchedulerExecutionMode.REACTIVE.name()))
 .build());
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN =
+key("jobmanager.adaptive-scheduler.scaling-interval.min")
+.durationType()
+.defaultValue(Duration.ofSeconds(30))
+// rescaling and let the user increase the value for high 
workloads
+.withDescription(
+"Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");

Review Comment:
   1. if the parameter value is passed by CLI as a String, will the duration be 
parsed correctly with the correct timeUnit?
   2. yes but new resources addition call that triggers the rescale (and the 
cooldown period) only happens in reactive mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321269048


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -55,7 +63,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
 Logger logger,
 Context context,
 ClassLoader userCodeClassLoader,
-List failureCollection) {
+List failureCollection,
+Duration scalingIntervalMin,
+Duration scalingIntervalMax) {

Review Comment:
   +1 thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-11 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1321266685


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##
@@ -496,6 +556,8 @@ private final class ExecutingStateBuilder {
 TestingDefaultExecutionGraphBuilder.newBuilder()
 .build(EXECUTOR_RESOURCE.getExecutor());
 private OperatorCoordinatorHandler operatorCoordinatorHandler;
+private Duration scalingIntervalMin = Duration.ZERO;

Review Comment:
   no, this builder is used also for the other tests. So it is initialized with 
a conservative default (cooldown period disabled). It the cooldown tests, this 
value is set to > 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-29 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1308521706


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   Exactly. This is what I meant and implemented in last commit of yesterday : 
the call at 08:01:10 will schedule `maybeRescale` at 08:01:40 and the call at 
08:01:15 will not entail any rescale. When `maybeRescale` is executed at  
08:01:40, it will use the resources added at 08:01:15 when calling 
`Context#getExecutionGraph()`. So that seems fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   ok thanks for pointing out. That being said, with the period restart 
behavior (see previous comment), in the above case, on first call 
`maybeRescale` will be scheduled for 08:01:40 and on second call `maybeRescale` 
will be scheduled for 08:01:45. You want that we simply drop the last schedule 
so that only one `maybeRescale` is scheduled at a time ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   ok thanks for pointing out. That being said, with the period restart 
behavior (see previous comment), in the above case, on first call 
`maybeRescale` will be scheduled for 08:01:40 and on second call maybeRescale` 
will be scheduled for 08:01:45. You want that we simply drop the last schedule 
so that only one `maybeRescale` is scheduled at a time ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   ok thanks for pointing out. That being said, with the period reset behavior 
(see previous comment), in the above case, on first call `maybeRescale` will be 
scheduled for 08:01:40 and on second call maybeRescale` will be scheduled for 
08:01:45. You want that we simply drop the last schedule so that only one 
`maybeRescale` is scheduled at a time ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   ok thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307486764


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -210,6 +265,15 @@ interface Context
  * @return a ScheduledFuture representing pending completion of the 
task
  */
 ScheduledFuture runIfState(State expectedState, Runnable action, 
Duration delay);
+
+/**
+ * Runs the given action immediately or after a delay depending on the 
given condition.
+ *
+ * @param condition if met, the action is executed immediately or 
scheduled otherwise
+ * @param action action to run
+ * @param delay delay after which to run the action if the condition 
is not met
+ */
+void runIfConditionOrSchedule(boolean condition, Runnable action, 
Duration delay);

Review Comment:
   Agree, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-28 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1307106011


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -144,6 +188,17 @@ private void maybeRescale() {
 }
 }
 
+private Duration timeSinceLastRescale() {
+return Duration.between(lastRescale, Instant.now());
+}
+
+private void rescaleWhenCooldownPeriodIsOver() {
+context.runIfConditionOrSchedule(
+timeSinceLastRescale().compareTo(scalingIntervalMin) > 0,
+this::maybeRescale,
+scalingIntervalMin); // reset cooldown period

Review Comment:
   That is right. My first implementation scheduled ad 08:01:35 but @dmvk 
suggested 
[here](https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr) that 
we keep the min interval low but we restart the cooldown period each time. That 
is what is expressed in the FLIP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-10 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287306063


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -39,14 +42,22 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;
+private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);

Review Comment:
   I also changed the tests: I no more schedule tasks in the tests to avoid 
race conditions. But I rather introduced a status enum indicating if the task 
was either executed immediately or scheduled. In the tests I set/check this 
status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287306063


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -39,14 +42,22 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;
+private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);

Review Comment:
   I also changed the tests: I no more schedule (no time based test to avoid 
race conditions) but rather set/check the rescale status



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287304795


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -488,6 +488,23 @@ public enum SchedulerType {
 
code(SchedulerExecutionMode.REACTIVE.name()))
 .build());
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN =
+key("jobmanager.adaptive-scheduler.scaling-interval.min")
+.longType()
+.defaultValue(30L) // favor lower scaling-interval.min for 
more reactive
+// rescaling and let the user increase the value for high 
workloads
+.withDescription(
+"Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");
+
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MAX =
+key("jobmanager.adaptive-scheduler.scaling-interval.max")
+.longType()
+.noDefaultValue()
+.withDescription(
+"Allow the user to configure the time (in seconds) 
after which a scaling operation is triggered regardless if the requirements are 
met");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287297336


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -39,14 +42,22 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;
+private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);

Review Comment:
   :+1: yes I could use Context.componentMainThreadExecutor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287027621


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -39,14 +42,22 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;
+private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);

Review Comment:
   :+1: agree I could use `runIfState()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287027621


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -39,14 +42,22 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;
+private final ScheduledExecutorService scheduledExecutor = 
Executors.newScheduledThreadPool(1);

Review Comment:
   :+1: agree I could use `runIfState()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-08-08 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1287009835


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -488,6 +488,23 @@ public enum SchedulerType {
 
code(SchedulerExecutionMode.REACTIVE.name()))
 .build());
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN =
+key("jobmanager.adaptive-scheduler.scaling-interval.min")
+.longType()
+.defaultValue(30L) // favor lower scaling-interval.min for 
more reactive
+// rescaling and let the user increase the value for high 
workloads
+.withDescription(
+"Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");
+
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MAX =
+key("jobmanager.adaptive-scheduler.scaling-interval.max")
+.longType()
+.noDefaultValue()
+.withDescription(
+"Allow the user to configure the time (in seconds) 
after which a scaling operation is triggered regardless if the requirements are 
met");

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org