1996fanrui commented on code in PR #710: URL: https://github.com/apache/flink-kubernetes-operator/pull/710#discussion_r1392008277
########## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java: ########## @@ -299,60 +296,15 @@ void testAutoscalerDisabled() throws Exception { scalingHistory.put(Instant.ofEpochMilli(100), new ScalingSummary()); scalingHistory.put(Instant.ofEpochMilli(200), new ScalingSummary()); - // Test all scaling aren't expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(250), ZoneId.systemDefault()), 2); + stateStore.storeScalingHistory(context, Map.of(new JobVertexID(), scalingHistory)); + assertFalse(stateStore.getScalingHistory(context).isEmpty()); - // Test one scaling aren't expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(350), ZoneId.systemDefault()), 1); - - // Test all scaling are expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(450), ZoneId.systemDefault()), 0); - } - - private void getInstantScalingSummaryTreeMap( - SortedMap<Instant, ScalingSummary> scalingHistoryData, - Clock clock, - int expectedScalingHistorySize) - throws Exception { - stateStore = new TestingAutoscalerStateStore<>(); var autoscaler = new JobAutoScalerImpl<>( null, null, null, eventCollector, scalingRealizer, stateStore); - - enrichStateStore(scalingHistoryData); - stateStore.flush(context); - assertThat(stateStore.getFlushCount()).isEqualTo(1); - - autoscaler.setClock(clock); autoscaler.scale(context); - assertThat(stateStore.getParallelismOverrides(context)).isEmpty(); - assertThat(stateStore.getCollectedMetrics(context)).isEmpty(); - - if (expectedScalingHistorySize > 0) { - Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory = - stateStore.getScalingHistory(context); - assertThat(scalingHistory).isNotEmpty(); - assertThat(scalingHistory.values()) - .allMatch(aa -> aa.size() == expectedScalingHistorySize); - } else { - assertThat(stateStore.getScalingHistory(context)).isEmpty(); - } - assertThat(stateStore.getFlushCount()).isEqualTo(2); - } - - private void enrichStateStore(SortedMap<Instant, ScalingSummary> scalingHistory) { - var v1 = new JobVertexID(); - var v2 = new JobVertexID(); - stateStore.storeParallelismOverrides( - context, Map.of(v1.toString(), "1", v2.toString(), "2")); - - var metricHistory = new TreeMap<Instant, CollectedMetrics>(); - stateStore.storeCollectedMetrics(context, metricHistory); - stateStore.storeScalingHistory(context, Map.of(v1, scalingHistory, v2, scalingHistory)); + assertTrue(stateStore.getScalingHistory(context).isEmpty()); Review Comment: Could we test `ParallelismOverrides` and `CollectedMetrics` as well? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java: ########## @@ -44,13 +43,44 @@ public class ConfigMapStore { private final KubernetesClient kubernetesClient; - // The cache for each resourceId may be in three states: - // 1. The resourceId doesn't exist : ConfigMap isn't loaded from kubernetes, or it's deleted - // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes - // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it may not be the same - // if not flushed already - private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache = - new ConcurrentHashMap<>(); + static class ConfigMapState { + private boolean flushed = true; + private boolean exists = true; + + @VisibleForTesting ConfigMap configMap; + + public Map<String, String> getData() { Review Comment: ```suggestion @VisibleForTesting public Map<String, String> getData() { ``` `getData` is only used for test ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java: ########## @@ -63,29 +93,29 @@ protected void putSerializedState( protected Optional<String> getSerializedState( KubernetesJobAutoScalerContext jobContext, String key) { - return getConfigMap(jobContext).map(configMap -> configMap.getData().get(key)); + return Optional.ofNullable(getConfigMap(jobContext).configMap.getData().get(key)); } protected void removeSerializedState(KubernetesJobAutoScalerContext jobContext, String key) { - getConfigMap(jobContext) - .ifPresentOrElse( - configMap -> configMap.getData().remove(key), - () -> { - throw new IllegalStateException( - "The configMap isn't created, so the remove is unavailable."); - }); + getConfigMap(jobContext).removeKey(key); } public void flush(KubernetesJobAutoScalerContext jobContext) { - Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey()); - if (configMapOpt == null || configMapOpt.isEmpty()) { + ConfigMapState configMapState = cache.get(jobContext.getJobKey()); + if (configMapState == null || (configMapState.flushed && configMapState.exists)) { Review Comment: It's better to extract a method `isFlushNeeded` for `ConfigMapState` instead of call `configMapState.flushed && configMapState.exists` directly. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java: ########## @@ -44,13 +43,44 @@ public class ConfigMapStore { private final KubernetesClient kubernetesClient; - // The cache for each resourceId may be in three states: - // 1. The resourceId doesn't exist : ConfigMap isn't loaded from kubernetes, or it's deleted - // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes - // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it may not be the same - // if not flushed already - private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache = - new ConcurrentHashMap<>(); + static class ConfigMapState { + private boolean flushed = true; + private boolean exists = true; + + @VisibleForTesting ConfigMap configMap; + + public Map<String, String> getData() { + return Collections.unmodifiableMap(configMap.getData()); + } + + public void clear() { Review Comment: It's not used, right? Could it be removed? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java: ########## @@ -44,13 +43,44 @@ public class ConfigMapStore { private final KubernetesClient kubernetesClient; - // The cache for each resourceId may be in three states: - // 1. The resourceId doesn't exist : ConfigMap isn't loaded from kubernetes, or it's deleted - // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes - // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it may not be the same - // if not flushed already - private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache = - new ConcurrentHashMap<>(); + static class ConfigMapState { + private boolean flushed = true; + private boolean exists = true; + + @VisibleForTesting ConfigMap configMap; Review Comment: Could we add a setter for `ConfigMap configMap`, and update `exists` inside of this setter. It's safer and clearer for ConfigMapState, and easy to read and understand. And could `ConfigMapState` as a separate class instead of `inner class of ConfigMapStroe` and mark all fields as private? I see `ConfigMapStroe` update the configMap directly, and it's dangerous in the future if some developers update it without update `flushed` or `exists`. And it's easy to introduce bug in the future. In short: I prefer `ConfigMapState` to expose the interface rather than the details. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java: ########## @@ -44,13 +43,44 @@ public class ConfigMapStore { private final KubernetesClient kubernetesClient; - // The cache for each resourceId may be in three states: - // 1. The resourceId doesn't exist : ConfigMap isn't loaded from kubernetes, or it's deleted - // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes - // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it may not be the same - // if not flushed already - private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache = - new ConcurrentHashMap<>(); + static class ConfigMapState { + private boolean flushed = true; + private boolean exists = true; + + @VisibleForTesting ConfigMap configMap; + + public Map<String, String> getData() { + return Collections.unmodifiableMap(configMap.getData()); + } + + public void clear() { + if (configMap.getData().isEmpty()) { + return; + } + configMap.getData().clear(); + flushed = false; + } + + public void removeKey(String key) { + var oldKey = configMap.getData().remove(key); + if (oldKey != null) { + flushed = false; + } + } + + public void put(String key, String value) { + configMap.getData().put(key, value); + flushed = false; + } + } + + // The cache for each resourceId may be in four states: + // 1. No cache entry: ConfigMap isn't loaded from kubernetes, or it's deleted. + // 2 Cache entry, not created : The ConfigMap doesn't exist in Kubernetes. + // 2 Cache entry, not flushed : The ConfigMap exists in Kubernetes, but it is not updated yet. + // 3. Cache entry, flushed and created : We have loaded the ConfigMap from kubernetes, and it's + // up-to-date. Review Comment: ```suggestion // 2. Cache entry, not created : The ConfigMap doesn't exist in Kubernetes. // 3. Cache entry, not flushed : The ConfigMap exists in Kubernetes, but it is not updated yet. // 4. Cache entry, flushed and created : We have loaded the ConfigMap from kubernetes, and it's // up-to-date. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org