[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1328774059 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1326121636 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > This is all about stability; "I could have rescaled and it wouldn't have been a problem.". But you can only conclude that after having out that slot for some period of time; otherwise you're just guessing. Exactly. So, when slot arrives scheduling a rescale-check after a timeout seems a good trade-off to ensure that the slot is still there after a period of time. When the timeout fires: - If the slot is still here we should force-rescale (better late than never case) - If the slot has vanished we should determine that the target parallelism is the same as current and not rescale (avoid unnecessary rescale case) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1325907644 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -488,6 +488,23 @@ public enum SchedulerType { code(SchedulerExecutionMode.REACTIVE.name())) .build()); +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN = +key("jobmanager.adaptive-scheduler.scaling-interval.min") +.durationType() +.defaultValue(Duration.ofSeconds(30)) +// rescaling and let the user increase the value for high workloads +.withDescription( +"Determines the minimum time (in seconds) between scaling operations in reactive mode."); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1325871122 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -47,6 +51,10 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: Thanks Chesnay for your views (again) ! > With option 2 we rescale right away because the timeout already elapsed within the previous 24 hours. This is what I proposed indeed, but you're right that > it might be side-stepping the intentions behind the min increase option and timeout if for example after the 24h resources start to arrive 1 slot every 5 min, with this scenario we will restart every 5 min (as the timeout is exceeded min-increase check is overridden) which is what we want to avoid. So it is better to schedule a timeout when resources arrive and `added ressource < min increase`. When the timeout fires, we do one single rescale that takes all the added slots in one shot. I'll do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: Thanks Chesnay for your views (again) ! > With option 2 we rescale right away because the timeout already elapsed within the previous 24 hours. This is what I proposed indeed, but you're right that > it might be side-stepping the intentions behind the min increase option and timeout if for example after the 24h resources start to arrive 1 slot every 5 min, with this scenario we will restart every 5 min (as the timeout is exceeded) which we want to avoid. So it is better to schedule a timeout when resources arrive and `added ressource < min increase`. When the timeout fires we do one single rescale that takes all the added slots in one shot. I'll do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321393999 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { this.context = context; Preconditions.checkState( executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph"); +this.scalingIntervalMin = scalingIntervalMin; +this.scalingIntervalMax = scalingIntervalMax; +Preconditions.checkState( +!scalingIntervalMin.isNegative(), +"{} must be positive integer or 0", +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key()); +if (scalingIntervalMax != null) { +Preconditions.checkState( +scalingIntervalMax.compareTo(scalingIntervalMin) > 0, +"{}({}) must be greater than {}({})", +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax, +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), +scalingIntervalMin); +} deploy(); // check if new resources have come available in the meantime context.runIfState(this, this::maybeRescale, Duration.ZERO); Review Comment: not necessary a new rescale it could be a transition from Created to Executing state (FLIP-160). In that case we want to eagerly adapt to resources that could have arrived while we were starting and not wait for the cooldown period to end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: Here is the summary of the offline discussion @1996fanrui and I had about `scalingIntervalMax` and forcing rescale (current code: when `added resources < min-parallelism-increase` we force a rescale if `timeSinceLastRescale > scalingIntervalMax`): 1. aim: the longer the pipeline runs, the more the (small) resource gain is worth the restarting time. 2. corner case: a resource `< min-parallelism-increase` arrives when `timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a long time (typical case 1) => with the current code, we don't force a rescale in that case whereas the added resource would be worth the restarting time. => I proposed solution 1: changing the definition of `scalingIntervalMax` to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a rescale even if `added resources < min-parallelism-increase` => @1996fanrui proposed solution 2: if `added resources < min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule a tryRescale after `scalingIntervalMax` : force a rescale if there is indeed a change in the resource graph at that time in case the last TM crashed @zentol you were the one who proposed the addition of `scalingIntervalMax` in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1324233808 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: Here is the summary of the offline discussion @1996fanrui and I had about `scalingIntervalMax` and forcing rescale (current code: when `added resources < min-parallelism-increase` we force a rescale if `timeSinceLastRescale > scalingIntervalMax`): 1. aim: the longer the pipeline runs, the more the (small) resource gain is worth the restarting time. 2. corner case: a resource `< min-parallelism-increase` arrives when `timeSinceLastRescale < scalingIntervalMax` and the pipeline is running for a long time (typical case 1) => with the current code, we don't force a rescale in that case whereas the added resource would be worth the restarting time. => I proposed solution 1: changing the definition of `scalingIntervalMax` to `pipelineRuntimeRescaleMin` meaning pipeline runtime after which we force a rescale even if `added resources < min-parallelism-increase` => @1996fanrui proposed solution 2: if `added resources < min-parallelism-increase && timeSinceLastRescale < scalingIntervalMax` schedule a tryRescale (force a rescale if there is indeed a change in the resource graph at that time in case the last TM crashed) after `scalingIntervalMax` @zentol you were the one who proposed the addition of `scalingIntervalMax` in the FLIP discussion thread. Do you prefer solution 1 or solution 2 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1322947925 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > I think the semantic of scalingIntervalMax should be fixed to the user side and should not be related to the calling order in the code or the order in which resources arrive. I don't understand what you mean by "fixed to the user side". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1322784279 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > > > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? > > > > > > yes, that is what is what the community agreed on in the FLIP. > > I think this strategy just solve one case: the last resource comes after scalingIntervalMax. The job can rescale directly. > It was made after this observation: the longer the pipeline runs, the more the (small) resource gain is worth the restarting time. > However, it cannot solve the case: the last resource comes before scalingIntervalMax. Assuming the scalingIntervalMax is 5 miuntes, the `jobmanager.adaptive-scheduler.min-parallelism-increase` is 2, and the expected paralleslim is 100. > > * The job has 99 TMs at 09:00:00, it run with parallelism 99. > * At 09:03:00, the last TM comes. > * The `Executing` will ignore the rescale due to the parallelism diff is 1, it less than `min-parallelism-increase` and `current time < lastRescaleTime + scalingIntervalMax`. > > And then the job doesn't need the new resource and the expected parallelism isn't change, so the job won't rescale anymore. It is not linked to the scalingIntervalMax, before this PR in such a case there was no rescale either because simply the minimum resource addition was not met. It is by design of the min-parallelism-increase that we don't change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1322784279 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > > > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? > > > > > > yes, that is what is what the community agreed on in the FLIP. > > I think this strategy just solve one case: the last resource comes after scalingIntervalMax. The job can rescale directly. > It was made after this observation, the longer the pipeline runs, the more the resource gain is worth the restarting time. > However, it cannot solve the case: the last resource comes before scalingIntervalMax. Assuming the scalingIntervalMax is 5 miuntes, the `jobmanager.adaptive-scheduler.min-parallelism-increase` is 2, and the expected paralleslim is 100. > > * The job has 99 TMs at 09:00:00, it run with parallelism 99. > * At 09:03:00, the last TM comes. > * The `Executing` will ignore the rescale due to the parallelism diff is 1, it less than `min-parallelism-increase` and `current time < lastRescaleTime + scalingIntervalMax`. > > And then the job doesn't need the new resource and the expected parallelism isn't change, so the job won't rescale anymore. It is not linked to the scalingIntervalMax, before this PR in such a case there was no rescale either because simply the minimum resource addition was not met. It is by design of the min-parallelism-increase that we don't change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1322738246 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > > > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. > > > > > > This logic is already in maybeRescale() method > > Sorry, I have a typo here. I mean the logic related to scalingIntervalMax should be moved out of the maybeRescale. > > `onNewResourceRequirements` also call the `rescaleWhenCooldownPeriodIsOver` when user changed the parallelism, maybeRescale shouldn't force trigger a rescale in this case even if the `current time > lastRescaleTime + scalingIntervalMax`. > > Because after requirement is adjusted, resources cannot be ready immediately. The rescale is meaningless at this time even if the `current time > lastRescaleTime + scalingIntervalMax`. Agree, I think the same is true for maybeRescale() called in the constructor to check if new resources have arrived during the starting process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? yes, that is what the community agreed on in the FLIP. > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. This logic is already in maybeRescale() method > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, when user changed the parallelism; maybeRescale will force trigger a rescale as well. right? yes `onNewResourceRequirements` is ultimately called by a rest endpoint to react to user action on the web ui so it will call maybeRescale in the end but that would not always mean a rescale: if `timeSinceLastRescale < scalingIntervalMax` and minimal resources are not met then there will be no rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? yes, that is what the community agreed on in the FLIP. > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. This logic is already in maybeRescale() method > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, when user changed the parallelism; maybeRescale will force trigger a rescale as well. right? yes `onNewResourceRequirements` is ultimately called by a rest endpoint to react to user action on the web ui so it will call maybeRescale in the end but that would not always mean a rescale: if timeSinceLastRescale < scalingIntervalMax and minimal resources are not met then there will be no rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? yes, that is what the community agreed on in the FLIP. > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. This logic is already in maybeRescale() method > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, when user changed the parallelism; maybeRescale will force trigger a rescale as well. right? yes `onNewResourceRequirements` is ultimately called by a rest endpoint to react to use action on the web ui so it will call maybeRescale in the end but that would not always mean a rescale: if timeSinceLastRescale < scalingIntervalMax and minimal resources are not met then there will be no rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? yes, that is what is what the community agreed on in the FLIP. > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. This logic is already in maybeRescale() method > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, when user changed the parallelism; maybeRescale will force trigger a rescale as well. right? yes `onNewResourceRequirements` is ultimately called by a rest endpoint to react to use action on the web ui so it will call maybeRescale in the end but that would not always mean a rescale: if timeSinceLastRescale < scalingIntervalMax and minimal resources are not met then there will be no rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321602343 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: > Do you mean we force trigger a rescale when new resources comes and the current time > lastRescaleTime + scalingIntervalMax? yes, that is what is what the community agreed on in the FLIP. > If so, I think the logic related to scalingIntervalMax should be moved into maybeRescale. This logic is already in maybeRescale() method > onNewResourceRequirements also call the rescaleWhenCooldownPeriodIsOver, when user changed the parallelism; maybeRescale will force trigger a rescale as well. right? yes onNewResourceRequirements is ultimately called by a rest endpoint to react to use action on the web ui so it will call maybeRescale in the end but that would not always mean a rescale: if timeSinceLastRescale < scalingIntervalMax and minimal resources are not met then there will be no rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321325324 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: in the FLIP we said that the triggering event of the rescale is resource addition (`Executing#onNewResourcesAvailable`) (downscale is triggered by failures) so there is no need to schedule: when such an event happens `maybeRescale` is always called (either directly or scheduled) and when it is called, the rescale is done if `timeSinceLastRescale > scalingIntervalMax` no matter the min resources. There is already a test: `ExecutingTest#testNotifyNewResourcesAvailableWithNoResourcesAndForce` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321393999 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { this.context = context; Preconditions.checkState( executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph"); +this.scalingIntervalMin = scalingIntervalMin; +this.scalingIntervalMax = scalingIntervalMax; +Preconditions.checkState( +!scalingIntervalMin.isNegative(), +"{} must be positive integer or 0", +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key()); +if (scalingIntervalMax != null) { +Preconditions.checkState( +scalingIntervalMax.compareTo(scalingIntervalMin) > 0, +"{}({}) must be greater than {}({})", +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax, +JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), +scalingIntervalMin); +} deploy(); // check if new resources have come available in the meantime context.runIfState(this, this::maybeRescale, Duration.ZERO); Review Comment: not necessary a new rescale it could be a transition from Created to Executing state (FLIP-160). In that case we want to eagerly adapt to resources that could have arrived when while we were starting and not wait for the cooldown period to end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321390908 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -47,6 +51,10 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; Review Comment: recording lastRescale as Instant.now() in the Executing constructor could work but it means that even a regular transition from Created to Executing state (see [FLIP-160](https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler)) would be considered a rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321387884 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -47,6 +51,10 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; Review Comment: I agree, there is indeed a bug in Executing being a new object with each rescale when the transition to Restaring and then to Executing state is done. I thought the state of the Executing object was kept between the restarts (with checkpointing) but it is not, a new one is created in the transitionToState(). This leeds to lastRescale being always be EPOCH. Thanks for pointing out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321325324 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { -maybeRescale(); +rescaleWhenCooldownPeriodIsOver(); } private void maybeRescale() { -if (context.shouldRescale(getExecutionGraph())) { -getLogger().info("Can change the parallelism of job. Restarting job."); +final Duration timeSinceLastRescale = timeSinceLastRescale(); +rescaleScheduled = false; +final boolean shouldForceRescale = +(scalingIntervalMax != null) +&& (timeSinceLastRescale.compareTo(scalingIntervalMax) > 0) +&& (lastRescale != Instant.EPOCH); // initial rescale is not forced +if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) { +if (shouldForceRescale) { +getLogger() +.info( +"Time since last rescale ({}) > {} ({}). Force-changing the parallelism of the job. Restarting the job.", +timeSinceLastRescale, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), +scalingIntervalMax); +} else { +getLogger().info("Can change the parallelism of the job. Restarting the job."); +} +lastRescale = Instant.now(); context.goToRestarting( getExecutionGraph(), Review Comment: in the FLIP we said that the triggering event of the rescale is resource addition (`Executing#onNewResourcesAvailable`) (downscale is triggered by failures) so there is no need to schedule: when such an event happens `maybeRescale` is always called (either directly or scheduled) and when it is called, the rescale is done if `timeSinceLastRescale > scalingIntervalMax` no matter the min resources. There is also a test: `ExecutingTest#testNotifyNewResourcesAvailableWithNoResourcesAndForce` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321300535 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -488,6 +488,23 @@ public enum SchedulerType { code(SchedulerExecutionMode.REACTIVE.name())) .build()); +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN = +key("jobmanager.adaptive-scheduler.scaling-interval.min") +.durationType() +.defaultValue(Duration.ofSeconds(30)) +// rescaling and let the user increase the value for high workloads +.withDescription( +"Determines the minimum time (in seconds) between scaling operations in reactive mode."); Review Comment: 1. if the parameter value is passed by CLI as a String, will the duration be parsed correctly with the correct timeUnit? 2. yes but new resources addition call that triggers the rescale (and the cooldown period) only happens in reactive mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321269048 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -55,7 +63,9 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { Logger logger, Context context, ClassLoader userCodeClassLoader, -List failureCollection) { +List failureCollection, +Duration scalingIntervalMin, +Duration scalingIntervalMax) { Review Comment: +1 thanks for pointing out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1321266685 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java: ## @@ -496,6 +556,8 @@ private final class ExecutingStateBuilder { TestingDefaultExecutionGraphBuilder.newBuilder() .build(EXECUTOR_RESOURCE.getExecutor()); private OperatorCoordinatorHandler operatorCoordinatorHandler; +private Duration scalingIntervalMin = Duration.ZERO; Review Comment: no, this builder is used also for the other tests. So it is initialized with a conservative default (cooldown period disabled). It the cooldown tests, this value is set to > 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1308521706 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { Review Comment: Exactly. This is what I meant and implemented in last commit of yesterday : the call at 08:01:10 will schedule `maybeRescale` at 08:01:40 and the call at 08:01:15 will not entail any rescale. When `maybeRescale` is executed at 08:01:40, it will use the resources added at 08:01:15 when calling `Context#getExecutionGraph()`. So that seems fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { Review Comment: ok thanks for pointing out. That being said, with the period restart behavior (see previous comment), in the above case, on first call `maybeRescale` will be scheduled for 08:01:40 and on second call `maybeRescale` will be scheduled for 08:01:45. You want that we simply drop the last schedule so that only one `maybeRescale` is scheduled at a time ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { Review Comment: ok thanks for pointing out. That being said, with the period restart behavior (see previous comment), in the above case, on first call `maybeRescale` will be scheduled for 08:01:40 and on second call maybeRescale` will be scheduled for 08:01:45. You want that we simply drop the last schedule so that only one `maybeRescale` is scheduled at a time ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { Review Comment: ok thanks for pointing out. That being said, with the period reset behavior (see previous comment), in the above case, on first call `maybeRescale` will be scheduled for 08:01:40 and on second call maybeRescale` will be scheduled for 08:01:45. You want that we simply drop the last schedule so that only one `maybeRescale` is scheduled at a time ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307494302 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { Review Comment: ok thanks for pointing out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307486764 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -210,6 +265,15 @@ interface Context * @return a ScheduledFuture representing pending completion of the task */ ScheduledFuture runIfState(State expectedState, Runnable action, Duration delay); + +/** + * Runs the given action immediately or after a delay depending on the given condition. + * + * @param condition if met, the action is executed immediately or scheduled otherwise + * @param action action to run + * @param delay delay after which to run the action if the condition is not met + */ +void runIfConditionOrSchedule(boolean condition, Runnable action, Duration delay); Review Comment: Agree, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1307106011 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -144,6 +188,17 @@ private void maybeRescale() { } } +private Duration timeSinceLastRescale() { +return Duration.between(lastRescale, Instant.now()); +} + +private void rescaleWhenCooldownPeriodIsOver() { +context.runIfConditionOrSchedule( +timeSinceLastRescale().compareTo(scalingIntervalMin) > 0, +this::maybeRescale, +scalingIntervalMin); // reset cooldown period Review Comment: That is right. My first implementation scheduled ad 08:01:35 but @dmvk suggested [here](https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr) that we keep the min interval low but we restart the cooldown period each time. That is what is expressed in the FLIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287306063 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -39,14 +42,22 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; +private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); Review Comment: I also changed the tests: I no more schedule tasks in the tests to avoid race conditions. But I rather introduced a status enum indicating if the task was either executed immediately or scheduled. In the tests I set/check this status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287306063 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -39,14 +42,22 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; +private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); Review Comment: I also changed the tests: I no more schedule (no time based test to avoid race conditions) but rather set/check the rescale status -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287304795 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -488,6 +488,23 @@ public enum SchedulerType { code(SchedulerExecutionMode.REACTIVE.name())) .build()); +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN = +key("jobmanager.adaptive-scheduler.scaling-interval.min") +.longType() +.defaultValue(30L) // favor lower scaling-interval.min for more reactive +// rescaling and let the user increase the value for high workloads +.withDescription( +"Determines the minimum time (in seconds) between scaling operations in reactive mode."); + +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MAX = +key("jobmanager.adaptive-scheduler.scaling-interval.max") +.longType() +.noDefaultValue() +.withDescription( +"Allow the user to configure the time (in seconds) after which a scaling operation is triggered regardless if the requirements are met"); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287297336 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -39,14 +42,22 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; +private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); Review Comment: :+1: yes I could use Context.componentMainThreadExecutor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287027621 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -39,14 +42,22 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; +private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); Review Comment: :+1: agree I could use `runIfState()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287027621 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ## @@ -39,14 +42,22 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph implements ResourceListener { private final Context context; +private Instant lastRescale = Instant.EPOCH; +private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); Review Comment: :+1: agree I could use `runIfState()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler
echauchot commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1287009835 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -488,6 +488,23 @@ public enum SchedulerType { code(SchedulerExecutionMode.REACTIVE.name())) .build()); +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN = +key("jobmanager.adaptive-scheduler.scaling-interval.min") +.longType() +.defaultValue(30L) // favor lower scaling-interval.min for more reactive +// rescaling and let the user increase the value for high workloads +.withDescription( +"Determines the minimum time (in seconds) between scaling operations in reactive mode."); + +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MAX = +key("jobmanager.adaptive-scheduler.scaling-interval.max") +.longType() +.noDefaultValue() +.withDescription( +"Allow the user to configure the time (in seconds) after which a scaling operation is triggered regardless if the requirements are met"); Review Comment: :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org