mxm commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1861985148


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {

Review Comment:
   I thought that `getFirstTriggerTime()` returns the first time we scaled 
*up*, but we are actually recording the time we first try to scale down. 
   
   I'm not sure this is correct. We want to delay scale down from the first 
time we scale up, not the first time we scaled down.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+                LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            } else {
+                LOG.debug(
+                        "Try to skip immediate scale down within scale-down 
interval for {}",
+                        vertex);
+            }
+            return ParallelismChange.noChange();
         } else {
-            return ParallelismChange.required(newParallelism);
+            // Using the maximum parallelism within the scale down interval 
window instead of the
+            // latest parallelism when scaling down
+            return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());

Review Comment:
   Thanks Rui!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to