1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1299169121


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -144,6 +188,17 @@ private void maybeRescale() {
         }
     }
 
+    private Duration timeSinceLastRescale() {
+        return Duration.between(lastRescale, Instant.now());
+    }
+
+    private void rescaleWhenCooldownPeriodIsOver() {
+        context.runIfConditionOrSchedule(
+                timeSinceLastRescale().compareTo(scalingIntervalMin) > 0,
+                this::maybeRescale,
+                scalingIntervalMin); // reset cooldown period

Review Comment:
   Schedule the `maybeRescale` after  `scalingIntervalMin` might not make sense.
   
   For example, `scalingIntervalMin` is 30 seconds, `lastRescale` is 
`08:01:05`, and now is `08:01:10`.
   
   From your code, the `maybeRescale` is called at `08:01:40`, however it 
should be called at `08:01:35`, right?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -144,6 +188,17 @@ private void maybeRescale() {
         }
     }
 
+    private Duration timeSinceLastRescale() {
+        return Duration.between(lastRescale, Instant.now());
+    }
+
+    private void rescaleWhenCooldownPeriodIsOver() {

Review Comment:
   `rescaleWhenCooldownPeriodIsOver` should consider one case that it's called 
in multiple times.
   
   For example, scalingIntervalMin is 30 seconds, `lastRescale` is 08:01:05.
   
   - The `rescaleWhenCooldownPeriodIsOver` is called at  08:01:10, it will 
schedule `maybeRescale` at `08:01:35`.
   - The `rescaleWhenCooldownPeriodIsOver` maybe called at  08:01:15 again, it 
will schedule `maybeRescale` at `08:01:35` again.
   
   The second or repeated schedule should be ignored.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -210,6 +265,15 @@ interface Context
          * @return a ScheduledFuture representing pending completion of the 
task
          */
         ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
+
+        /**
+         * Runs the given action immediately or after a delay depending on the 
given condition.
+         *
+         * @param condition if met, the action is executed immediately or 
scheduled otherwise
+         * @param action action to run
+         * @param delay delay after which to run the action if the condition 
is not met
+         */
+        void runIfConditionOrSchedule(boolean condition, Runnable action, 
Duration delay);

Review Comment:
   This interface isn't necessary, if 
`timeSinceLastRescale().compareTo(scalingIntervalMin)` is true, we can call  
`maybeRescale` directly.
   
   If it's false, it's better to call `runIfState` to schedule. 
   
   BTW, if the state isn't this(Executing), the `maybeRescale` shouldn't be 
called. That's why the `runIfState` name includes `IfState`. The 
`runIfConditionOrSchedule`  doesn't consider the `IfState`, it might have bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to