1996fanrui commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1859764146


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> 
autoScalerEventHandl
         this.autoScalerEventHandler = autoScalerEventHandler;
     }
 
-    /** The parallelism change type of {@link ParallelismChange}. */
-    public enum ParallelismChangeType {
-        NO_CHANGE,
-        REQUIRED_CHANGE,
-        OPTIONAL_CHANGE;
-    }
-
-    /**
-     * The rescaling will be triggered if any vertex's ParallelismChange is 
required. This means
-     * that if all vertices' ParallelismChange is optional, rescaling will be 
ignored.
-     */
+    /** The rescaling will be triggered if any vertex's {@link 
ParallelismChange} is changed. */
     @Getter
     public static class ParallelismChange {
 
-        private static final ParallelismChange NO_CHANGE =

Review Comment:
   We could keep it in this PR first, and refactor it in the future if needed.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -181,15 +178,15 @@ private void updateRecommendedParallelism(
     }
 
     @VisibleForTesting
-    static boolean allRequiredVerticesWithinUtilizationTarget(
+    static boolean allChangedVerticesWithinUtilizationTarget(
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
-            Set<JobVertexID> requiredVertices) {
-        // All vertices' ParallelismChange is optional, rescaling will be 
ignored.
-        if (requiredVertices.isEmpty()) {
+            Set<JobVertexID> changedVertices) {
+        // No any vertex is changed.
+        if (changedVertices.isEmpty()) {
             return true;
         }
 
-        for (JobVertexID vertex : requiredVertices) {
+        for (var vertex : changedVertices) {

Review Comment:
   reverted it to `JobVertexID vertex : changedVertices`.
   
   Or do you have any suggestion about it? When to use var ? When to use a 
concrete type?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {

Review Comment:
   Is it a typo? 
   
   Do you mean rename the `firstTriggerTime` to `firstScaleDownTime` inside of 
`VertexDelayedScaleDownInfo`?
   
   I prefer to use  `firstTriggerTime` because the class name includes 
`DelayedScaleDown`, so field name or method name doesn't need to includes scale 
down again. WDYT?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -289,21 +274,26 @@ private ParallelismChange applyScaleDownInterval(
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.required(newParallelism);
-        }
-
-        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
-        if (firstTriggerTime.isEmpty()) {
-            LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
-            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
-            return ParallelismChange.optional(newParallelism);
+            return ParallelismChange.build(newParallelism);
         }
 
-        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
-            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
-            return ParallelismChange.optional(newParallelism);
+        var now = clock.instant();
+        var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, 
now, newParallelism);
+
+        // Never scale down within scale down interval
+        if 
(now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval)))
 {
+            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
+                LOG.info("The scale down of {} is delayed by {}.", vertex, 
scaleDownInterval);
+            } else {
+                LOG.debug(
+                        "Try to skip immediate scale down within scale-down 
interval for {}",
+                        vertex);
+            }
+            return ParallelismChange.noChange();
         } else {
-            return ParallelismChange.required(newParallelism);
+            // Using the maximum parallelism within the scale down interval 
window instead of the
+            // latest parallelism when scaling down
+            return 
ParallelismChange.build(delayedScaleDownInfo.getMaxRecommendedParallelism());

Review Comment:
   Yes, we could ensure it. The triggered scale scale will be canceled when 
recommended parallelism >= current parallelism.
   
   It has been done in the previous PR of FLINK-36018, I could refer the code 
path from master branch:
   
   
   - if (newParallelism == currentParallelism) {
   
   
https://github.com/apache/flink-kubernetes-operator/blob/cfcee024e2639b83970459d0d2743ef6cf544626/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L224
   
   - Try to scale up 
   
   
https://github.com/apache/flink-kubernetes-operator/blob/cfcee024e2639b83970459d0d2743ef6cf544626/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L262



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to