1996fanrui commented on code in PR #710:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/710#discussion_r1392008277


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java:
##########
@@ -299,60 +296,15 @@ void testAutoscalerDisabled() throws Exception {
         scalingHistory.put(Instant.ofEpochMilli(100), new ScalingSummary());
         scalingHistory.put(Instant.ofEpochMilli(200), new ScalingSummary());
 
-        // Test all scaling aren't expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(250), 
ZoneId.systemDefault()), 2);
+        stateStore.storeScalingHistory(context, Map.of(new JobVertexID(), 
scalingHistory));
+        assertFalse(stateStore.getScalingHistory(context).isEmpty());
 
-        // Test one scaling aren't expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(350), 
ZoneId.systemDefault()), 1);
-
-        // Test all scaling are expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(450), 
ZoneId.systemDefault()), 0);
-    }
-
-    private void getInstantScalingSummaryTreeMap(
-            SortedMap<Instant, ScalingSummary> scalingHistoryData,
-            Clock clock,
-            int expectedScalingHistorySize)
-            throws Exception {
-        stateStore = new TestingAutoscalerStateStore<>();
         var autoscaler =
                 new JobAutoScalerImpl<>(
                         null, null, null, eventCollector, scalingRealizer, 
stateStore);
-
-        enrichStateStore(scalingHistoryData);
-        stateStore.flush(context);
-        assertThat(stateStore.getFlushCount()).isEqualTo(1);
-
-        autoscaler.setClock(clock);
         autoscaler.scale(context);
 
-        assertThat(stateStore.getParallelismOverrides(context)).isEmpty();
-        assertThat(stateStore.getCollectedMetrics(context)).isEmpty();
-
-        if (expectedScalingHistorySize > 0) {
-            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory =
-                    stateStore.getScalingHistory(context);
-            assertThat(scalingHistory).isNotEmpty();
-            assertThat(scalingHistory.values())
-                    .allMatch(aa -> aa.size() == expectedScalingHistorySize);
-        } else {
-            assertThat(stateStore.getScalingHistory(context)).isEmpty();
-        }
-        assertThat(stateStore.getFlushCount()).isEqualTo(2);
-    }
-
-    private void enrichStateStore(SortedMap<Instant, ScalingSummary> 
scalingHistory) {
-        var v1 = new JobVertexID();
-        var v2 = new JobVertexID();
-        stateStore.storeParallelismOverrides(
-                context, Map.of(v1.toString(), "1", v2.toString(), "2"));
-
-        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
-        stateStore.storeCollectedMetrics(context, metricHistory);
-        stateStore.storeScalingHistory(context, Map.of(v1, scalingHistory, v2, 
scalingHistory));
+        assertTrue(stateStore.getScalingHistory(context).isEmpty());

Review Comment:
   Could we test `ParallelismOverrides` and `CollectedMetrics` as well?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -44,13 +43,44 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three states:
-    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
-    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
-    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
-    // if not flushed already
-    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
-            new ConcurrentHashMap<>();
+    static class ConfigMapState {
+        private boolean flushed = true;
+        private boolean exists = true;
+
+        @VisibleForTesting ConfigMap configMap;
+
+        public Map<String, String> getData() {

Review Comment:
   ```suggestion
           @VisibleForTesting
           public Map<String, String> getData() {
   ```
   
   `getData` is only used for test



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -63,29 +93,29 @@ protected void putSerializedState(
 
     protected Optional<String> getSerializedState(
             KubernetesJobAutoScalerContext jobContext, String key) {
-        return getConfigMap(jobContext).map(configMap -> 
configMap.getData().get(key));
+        return 
Optional.ofNullable(getConfigMap(jobContext).configMap.getData().get(key));
     }
 
     protected void removeSerializedState(KubernetesJobAutoScalerContext 
jobContext, String key) {
-        getConfigMap(jobContext)
-                .ifPresentOrElse(
-                        configMap -> configMap.getData().remove(key),
-                        () -> {
-                            throw new IllegalStateException(
-                                    "The configMap isn't created, so the 
remove is unavailable.");
-                        });
+        getConfigMap(jobContext).removeKey(key);
     }
 
     public void flush(KubernetesJobAutoScalerContext jobContext) {
-        Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
-        if (configMapOpt == null || configMapOpt.isEmpty()) {
+        ConfigMapState configMapState = cache.get(jobContext.getJobKey());
+        if (configMapState == null || (configMapState.flushed && 
configMapState.exists)) {

Review Comment:
   It's better to extract a method `isFlushNeeded` for `ConfigMapState` instead 
of call `configMapState.flushed && configMapState.exists` directly.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -44,13 +43,44 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three states:
-    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
-    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
-    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
-    // if not flushed already
-    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
-            new ConcurrentHashMap<>();
+    static class ConfigMapState {
+        private boolean flushed = true;
+        private boolean exists = true;
+
+        @VisibleForTesting ConfigMap configMap;
+
+        public Map<String, String> getData() {
+            return Collections.unmodifiableMap(configMap.getData());
+        }
+
+        public void clear() {

Review Comment:
   It's not used, right? Could it be removed?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -44,13 +43,44 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three states:
-    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
-    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
-    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
-    // if not flushed already
-    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
-            new ConcurrentHashMap<>();
+    static class ConfigMapState {
+        private boolean flushed = true;
+        private boolean exists = true;
+
+        @VisibleForTesting ConfigMap configMap;

Review Comment:
   Could we add a setter for `ConfigMap configMap`, and update `exists` inside 
of this setter. It's safer and clearer for ConfigMapState, and easy to read and 
understand.
   
   And could `ConfigMapState` as a separate class instead of `inner class of 
ConfigMapStroe` and mark all fields as private? I see `ConfigMapStroe` update 
the configMap directly, and it's dangerous in the future if some developers 
update it without update `flushed` or `exists`. And it's easy to introduce bug 
in the future.
   
   
   In short: I prefer `ConfigMapState` to expose the interface rather than the 
details.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -44,13 +43,44 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three states:
-    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
-    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
-    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
-    // if not flushed already
-    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
-            new ConcurrentHashMap<>();
+    static class ConfigMapState {
+        private boolean flushed = true;
+        private boolean exists = true;
+
+        @VisibleForTesting ConfigMap configMap;
+
+        public Map<String, String> getData() {
+            return Collections.unmodifiableMap(configMap.getData());
+        }
+
+        public void clear() {
+            if (configMap.getData().isEmpty()) {
+                return;
+            }
+            configMap.getData().clear();
+            flushed = false;
+        }
+
+        public void removeKey(String key) {
+            var oldKey = configMap.getData().remove(key);
+            if (oldKey != null) {
+                flushed = false;
+            }
+        }
+
+        public void put(String key, String value) {
+            configMap.getData().put(key, value);
+            flushed = false;
+        }
+    }
+
+    // The cache for each resourceId may be in four states:
+    // 1. No cache entry: ConfigMap isn't loaded from kubernetes, or it's 
deleted.
+    // 2  Cache entry, not created : The ConfigMap doesn't exist in Kubernetes.
+    // 2  Cache entry, not flushed : The ConfigMap exists in Kubernetes, but 
it is not updated yet.
+    // 3. Cache entry, flushed and created : We have loaded the ConfigMap from 
kubernetes, and it's
+    // up-to-date.

Review Comment:
   ```suggestion
       // 2. Cache entry, not created : The ConfigMap doesn't exist in 
Kubernetes.
       // 3. Cache entry, not flushed : The ConfigMap exists in Kubernetes, but 
it is not updated yet.
       // 4. Cache entry, flushed and created : We have loaded the ConfigMap 
from kubernetes, and it's
       // up-to-date.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to