1996fanrui commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1299169121
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -144,6 +188,17 @@ private void maybeRescale() { } } + private Duration timeSinceLastRescale() { + return Duration.between(lastRescale, Instant.now()); + } + + private void rescaleWhenCooldownPeriodIsOver() { + context.runIfConditionOrSchedule( + timeSinceLastRescale().compareTo(scalingIntervalMin) > 0, + this::maybeRescale, + scalingIntervalMin); // reset cooldown period Review Comment: Schedule the `maybeRescale` after `scalingIntervalMin` might not make sense. For example, `scalingIntervalMin` is 30 seconds, `lastRescale` is `08:01:05`, and now is `08:01:10`. From your code, the `maybeRescale` is called at `08:01:40`, however it should be called at `08:01:35`, right? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -144,6 +188,17 @@ private void maybeRescale() { } } + private Duration timeSinceLastRescale() { + return Duration.between(lastRescale, Instant.now()); + } + + private void rescaleWhenCooldownPeriodIsOver() { Review Comment: `rescaleWhenCooldownPeriodIsOver` should consider one case that it's called in multiple times. For example, scalingIntervalMin is 30 seconds, `lastRescale` is 08:01:05. - The `rescaleWhenCooldownPeriodIsOver` is called at 08:01:10, it will schedule `maybeRescale` at `08:01:35`. - The `rescaleWhenCooldownPeriodIsOver` maybe called at 08:01:15 again, it will schedule `maybeRescale` at `08:01:35` again. The second or repeated schedule should be ignored. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -210,6 +265,15 @@ interface Context * @return a ScheduledFuture representing pending completion of the task */ ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay); + + /** + * Runs the given action immediately or after a delay depending on the given condition. + * + * @param condition if met, the action is executed immediately or scheduled otherwise + * @param action action to run + * @param delay delay after which to run the action if the condition is not met + */ + void runIfConditionOrSchedule(boolean condition, Runnable action, Duration delay); Review Comment: This interface isn't necessary, if `timeSinceLastRescale().compareTo(scalingIntervalMin)` is true, we can call `maybeRescale` directly. If it's false, it's better to call `runIfState` to schedule. BTW, if the state isn't this(Executing), the `maybeRescale` shouldn't be called. That's why the `runIfState` name includes `IfState`. The `runIfConditionOrSchedule` doesn't consider the `IfState`, it might have bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org