huyuanfeng2018 commented on code in PR #920:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/920#discussion_r1858110986


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java:
##########
@@ -19,51 +19,81 @@
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
 import lombok.Getter;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 /** All delayed scale down requests. */
 public class DelayedScaleDown {
 
-    @Getter private final Map<JobVertexID, Instant> firstTriggerTime;
+    /** The delayed scale down info for vertex. */
+    @Data
+    public static class VertexDelayedScaleDownInfo {
+        private final Instant firstTriggerTime;
+        private int maxRecommendedParallelism;
+
+        @JsonCreator
+        public VertexDelayedScaleDownInfo(
+                @JsonProperty("firstTriggerTime") Instant firstTriggerTime,
+                @JsonProperty("maxRecommendedParallelism") int 
maxRecommendedParallelism) {
+            this.firstTriggerTime = firstTriggerTime;
+            this.maxRecommendedParallelism = maxRecommendedParallelism;
+        }
+    }
+
+    @Getter private final Map<JobVertexID, VertexDelayedScaleDownInfo> 
delayedVertices;
 
     // Have any scale down request been updated? It doesn't need to be stored, 
it is only used to
     // determine whether DelayedScaleDown needs to be stored.
-    @Getter private boolean isUpdated = false;
+    @JsonIgnore @Getter private boolean updated = false;
 
     public DelayedScaleDown() {
-        this.firstTriggerTime = new HashMap<>();
-    }
-
-    public DelayedScaleDown(Map<JobVertexID, Instant> firstTriggerTime) {
-        this.firstTriggerTime = firstTriggerTime;
+        this.delayedVertices = new HashMap<>();
     }
 
-    Optional<Instant> getFirstTriggerTimeForVertex(JobVertexID vertex) {
-        return Optional.ofNullable(firstTriggerTime.get(vertex));
-    }
+    /** Trigger a scale down, and return the corresponding {@link 
VertexDelayedScaleDownInfo}. */
+    @Nonnull
+    public VertexDelayedScaleDownInfo triggerScaleDown(
+            JobVertexID vertex, Instant triggerTime, int parallelism) {
+        var vertexInfo = delayedVertices.get(vertex);

Review Comment:
   ```suggestion
           var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
   ```
   I suggest changing it to this
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -88,31 +88,22 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> 
autoScalerEventHandl
         this.autoScalerEventHandler = autoScalerEventHandler;
     }
 
-    /** The parallelism change type of {@link ParallelismChange}. */
-    public enum ParallelismChangeType {
-        NO_CHANGE,
-        REQUIRED_CHANGE,
-        OPTIONAL_CHANGE;
-    }
-
-    /**
-     * The rescaling will be triggered if any vertex's ParallelismChange is 
required. This means
-     * that if all vertices' ParallelismChange is optional, rescaling will be 
ignored.
-     */
+    /** The rescaling will be triggered if any vertex's {@link 
ParallelismChange} is changed. */
     @Getter
     public static class ParallelismChange {
 
-        private static final ParallelismChange NO_CHANGE =

Review Comment:
   Do we still need this class? It seems that `Optional<Integer>` is enough.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to