This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 02840b96 [FLINK-33527] Simplify state store cleanup logic (#710)
02840b96 is described below

commit 02840b96ef3116ea95a440af4f945398900d89df
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Wed Nov 15 15:16:41 2023 +0100

    [FLINK-33527] Simplify state store cleanup logic (#710)
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |  38 +------
 .../autoscaler/state/AutoScalerStateStore.java     |   9 +-
 .../state/InMemoryAutoScalerStateStore.java        |   7 ++
 .../flink/autoscaler/BacklogBasedScalingTest.java  |   9 +-
 .../flink/autoscaler/JobAutoScalerImplTest.java    |  71 +++---------
 .../autoscaler/RecommendedParallelismTest.java     |   9 +-
 .../state/TestingAutoscalerStateStore.java         |  37 ------
 .../operator/autoscaler/AutoscalerFactory.java     |   2 +
 .../autoscaler/{ => state}/ConfigMapStore.java     |  99 +++++-----------
 .../operator/autoscaler/state/ConfigMapView.java   | 117 +++++++++++++++++++
 .../KubernetesAutoScalerStateStore.java            |   8 +-
 .../KubernetesAutoScalerEventHandlerTest.java      |   2 +
 .../autoscaler/{ => state}/ConfigMapStoreTest.java | 126 ++++++++++++++++++---
 .../KubernetesAutoScalerStateStoreTest.java        |  43 ++++++-
 14 files changed, 351 insertions(+), 226 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 0e2afb55..1cfdba62 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
 import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
 import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
-import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.trimScalingHistory;
 
 /** The default implementation of {@link JobAutoScaler}. */
 public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
@@ -90,7 +89,8 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
         try {
             if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
                 LOG.debug("Autoscaler is disabled");
-                clearStatesAfterAutoscalerDisabled(ctx);
+                stateStore.clearAll(ctx);
+                stateStore.flush(ctx);
                 return;
             }
 
@@ -120,40 +120,6 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
         stateStore.removeInfoFromCache(jobKey);
     }
 
-    private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
-        var needFlush = false;
-        var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
-        if (!parallelismOverrides.isEmpty()) {
-            needFlush = true;
-            stateStore.removeParallelismOverrides(ctx);
-        }
-
-        var collectedMetrics = stateStore.getCollectedMetrics(ctx);
-        if (!collectedMetrics.isEmpty()) {
-            needFlush = true;
-            stateStore.removeCollectedMetrics(ctx);
-        }
-
-        var scalingHistory = stateStore.getScalingHistory(ctx);
-        if (!scalingHistory.isEmpty()) {
-            var trimmedScalingHistory =
-                    trimScalingHistory(clock.instant(), 
ctx.getConfiguration(), scalingHistory);
-            if (trimmedScalingHistory.isEmpty()) {
-                // All scaling histories are trimmed.
-                needFlush = true;
-                stateStore.removeScalingHistory(ctx);
-            } else if (!scalingHistory.equals(trimmedScalingHistory)) {
-                // Some scaling histories are trimmed.
-                needFlush = true;
-                stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
-            }
-        }
-
-        if (needFlush) {
-            stateStore.flush(ctx);
-        }
-    }
-
     @VisibleForTesting
     protected Map<String, String> getParallelismOverrides(Context ctx) throws 
Exception {
         return stateStore.getParallelismOverrides(ctx);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index bb3329a4..6f9e54e2 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -64,10 +64,13 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     void removeParallelismOverrides(Context jobContext) throws Exception;
 
+    /** Removes all data from this context. Flush stil needs to be called. */
+    void clearAll(Context jobContext) throws Exception;
+
     /**
-     * Flushing is needed because we just save data in cache for all store 
methods. For less write
-     * operations, we flush the cached data to the physical storage only after 
all operations have
-     * been performed.
+     * Flushing is needed because we do not persist data for all store methods 
until this method is
+     * called. Note: The state store implementation should try to avoid write 
operations unless data
+     * was changed through this interface.
      */
     void flush(Context jobContext) throws Exception;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 29176365..caf4e4fd 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -105,6 +105,13 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         parallelismOverridesStore.remove(jobContext.getJobKey());
     }
 
+    @Override
+    public void clearAll(Context jobContext) {
+        scalingHistoryStore.remove(jobContext.getJobKey());
+        parallelismOverridesStore.remove(jobContext.getJobKey());
+        collectedMetricsStore.remove(jobContext.getJobKey());
+    }
+
     @Override
     public void flush(Context jobContext) {
         // The InMemory state store doesn't persist data.
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 810bffe3..f3198188 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -24,7 +24,8 @@ import 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,7 +55,7 @@ public class BacklogBasedScalingTest {
 
     private JobAutoScalerContext<JobID> context;
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
-    private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
 
     private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> 
metricsCollector;
     private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> 
scalingExecutor;
@@ -68,7 +69,7 @@ public class BacklogBasedScalingTest {
         context = createDefaultJobAutoScalerContext();
 
         eventCollector = new TestingEventCollector<>();
-        stateStore = new TestingAutoscalerStateStore<>();
+        stateStore = new InMemoryAutoScalerStateStore<>();
 
         scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore);
 
@@ -395,7 +396,7 @@ public class BacklogBasedScalingTest {
         assertTrue(eventCollector.events.isEmpty());
     }
 
-    private void assertEvaluatedMetricsSize(int expectedSize) {
+    private void assertEvaluatedMetricsSize(int expectedSize) throws Exception 
{
         SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
                 stateStore.getCollectedMetrics(context);
         assertThat(evaluatedMetrics).hasSize(expectedSize);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 7f507dd4..a553c1b7 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -26,7 +26,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.client.program.rest.RestClusterClient;
@@ -43,16 +44,13 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.time.ZoneId;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
@@ -61,6 +59,8 @@ import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABL
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for JobAutoScalerImpl. */
 public class JobAutoScalerImplTest {
@@ -68,7 +68,7 @@ public class JobAutoScalerImplTest {
     private JobAutoScalerContext<JobID> context;
     private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> 
scalingRealizer;
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
-    private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
 
     @BeforeEach
     public void setup() {
@@ -77,7 +77,7 @@ public class JobAutoScalerImplTest {
 
         scalingRealizer = new TestingScalingRealizer<>();
         eventCollector = new TestingEventCollector<>();
-        stateStore = new TestingAutoscalerStateStore<>();
+        stateStore = new InMemoryAutoScalerStateStore<>();
     }
 
     @Test
@@ -209,10 +209,8 @@ public class JobAutoScalerImplTest {
         assertThat(autoscaler.getParallelismOverrides(context)).isEmpty();
         assertParallelismOverrides(null);
 
-        int requestCount = stateStore.getFlushCount();
         // Make sure we don't update in kubernetes once removed
         autoscaler.scale(context);
-        assertEquals(requestCount, stateStore.getFlushCount());
 
         context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true");
         autoscaler.applyParallelismOverrides(context);
@@ -299,60 +297,25 @@ public class JobAutoScalerImplTest {
         scalingHistory.put(Instant.ofEpochMilli(100), new ScalingSummary());
         scalingHistory.put(Instant.ofEpochMilli(200), new ScalingSummary());
 
-        // Test all scaling aren't expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(250), 
ZoneId.systemDefault()), 2);
+        stateStore.storeScalingHistory(context, Map.of(new JobVertexID(), 
scalingHistory));
+        assertFalse(stateStore.getScalingHistory(context).isEmpty());
 
-        // Test one scaling aren't expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(350), 
ZoneId.systemDefault()), 1);
+        stateStore.storeParallelismOverrides(context, Map.of("vertex", "4"));
+        assertFalse(stateStore.getParallelismOverrides(context).isEmpty());
 
-        // Test all scaling are expired
-        getInstantScalingSummaryTreeMap(
-                scalingHistory, Clock.fixed(Instant.ofEpochMilli(450), 
ZoneId.systemDefault()), 0);
-    }
+        TreeMap<Instant, CollectedMetrics> metrics = new TreeMap<>();
+        metrics.put(Instant.now(), new CollectedMetrics());
+        stateStore.storeCollectedMetrics(context, metrics);
+        assertFalse(stateStore.getCollectedMetrics(context).isEmpty());
 
-    private void getInstantScalingSummaryTreeMap(
-            SortedMap<Instant, ScalingSummary> scalingHistoryData,
-            Clock clock,
-            int expectedScalingHistorySize)
-            throws Exception {
-        stateStore = new TestingAutoscalerStateStore<>();
         var autoscaler =
                 new JobAutoScalerImpl<>(
                         null, null, null, eventCollector, scalingRealizer, 
stateStore);
-
-        enrichStateStore(scalingHistoryData);
-        stateStore.flush(context);
-        assertThat(stateStore.getFlushCount()).isEqualTo(1);
-
-        autoscaler.setClock(clock);
         autoscaler.scale(context);
 
-        assertThat(stateStore.getParallelismOverrides(context)).isEmpty();
-        assertThat(stateStore.getCollectedMetrics(context)).isEmpty();
-
-        if (expectedScalingHistorySize > 0) {
-            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory =
-                    stateStore.getScalingHistory(context);
-            assertThat(scalingHistory).isNotEmpty();
-            assertThat(scalingHistory.values())
-                    .allMatch(aa -> aa.size() == expectedScalingHistorySize);
-        } else {
-            assertThat(stateStore.getScalingHistory(context)).isEmpty();
-        }
-        assertThat(stateStore.getFlushCount()).isEqualTo(2);
-    }
-
-    private void enrichStateStore(SortedMap<Instant, ScalingSummary> 
scalingHistory) {
-        var v1 = new JobVertexID();
-        var v2 = new JobVertexID();
-        stateStore.storeParallelismOverrides(
-                context, Map.of(v1.toString(), "1", v2.toString(), "2"));
-
-        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
-        stateStore.storeCollectedMetrics(context, metricHistory);
-        stateStore.storeScalingHistory(context, Map.of(v1, scalingHistory, v2, 
scalingHistory));
+        assertTrue(stateStore.getScalingHistory(context).isEmpty());
+        assertTrue(stateStore.getScalingHistory(context).isEmpty());
+        assertTrue(stateStore.getParallelismOverrides(context).isEmpty());
     }
 
     private void assertParallelismOverrides(Map<String, String> 
expectedOverrides) {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 1884268e..87025b93 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -25,7 +25,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -55,7 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class RecommendedParallelismTest {
 
     private JobAutoScalerContext<JobID> context;
-    private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
 
     private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> 
metricsCollector;
     private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> 
scalingExecutor;
@@ -70,7 +71,7 @@ public class RecommendedParallelismTest {
 
         TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector =
                 new TestingEventCollector<>();
-        stateStore = new TestingAutoscalerStateStore<>();
+        stateStore = new InMemoryAutoScalerStateStore<>();
 
         scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore);
 
@@ -227,7 +228,7 @@ public class RecommendedParallelismTest {
         assertEquals(4, scaledParallelism.get(sink));
     }
 
-    private void assertEvaluatedMetricsSize(int expectedSize) {
+    private void assertEvaluatedMetricsSize(int expectedSize) throws Exception 
{
         SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
                 stateStore.getCollectedMetrics(context);
         assertThat(evaluatedMetrics).hasSize(expectedSize);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
deleted file mode 100644
index 3b3fafa1..00000000
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.autoscaler.state;
-
-import org.apache.flink.autoscaler.JobAutoScalerContext;
-
-/** Testing {@link AutoScalerStateStore} implementation. */
-public class TestingAutoscalerStateStore<KEY, Context extends 
JobAutoScalerContext<KEY>>
-        extends InMemoryAutoScalerStateStore<KEY, Context> {
-
-    private int flushCount;
-
-    @Override
-    public void flush(Context jobContext) {
-        super.flush(jobContext);
-        flushCount++;
-    }
-
-    public int getFlushCount() {
-        return flushCount;
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
index 97e70267..dea6df0b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.autoscaler.JobAutoScalerImpl;
 import org.apache.flink.autoscaler.RestApiMetricsCollector;
 import org.apache.flink.autoscaler.ScalingExecutor;
 import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
+import 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
similarity index 52%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
index 5ded9719..b840a292 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -28,8 +29,6 @@ import 
io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -44,13 +43,12 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three states:
-    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
-    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
-    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
-    // if not flushed already
-    private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
-            new ConcurrentHashMap<>();
+    /**
+     * Cache for Kubernetes ConfigMaps which reflects the latest state of a 
ConfigMap for a
+     * ResourceId. Any changes to the ConfigMap are only reflected in 
Kubernetes once the flush()
+     * method is called.
+     */
+    private final ConcurrentHashMap<ResourceID, ConfigMapView> cache = new 
ConcurrentHashMap<>();
 
     public ConfigMapStore(KubernetesClient kubernetesClient) {
         this.kubernetesClient = kubernetesClient;
@@ -58,34 +56,30 @@ public class ConfigMapStore {
 
     protected void putSerializedState(
             KubernetesJobAutoScalerContext jobContext, String key, String 
value) {
-        getOrCreateState(jobContext).put(key, value);
+        getConfigMap(jobContext).put(key, value);
     }
 
     protected Optional<String> getSerializedState(
             KubernetesJobAutoScalerContext jobContext, String key) {
-        return getConfigMap(jobContext).map(configMap -> 
configMap.getData().get(key));
+        return Optional.ofNullable(getConfigMap(jobContext).get(key));
     }
 
     protected void removeSerializedState(KubernetesJobAutoScalerContext 
jobContext, String key) {
-        getConfigMap(jobContext)
-                .ifPresentOrElse(
-                        configMap -> configMap.getData().remove(key),
-                        () -> {
-                            throw new IllegalStateException(
-                                    "The configMap isn't created, so the 
remove is unavailable.");
-                        });
+        getConfigMap(jobContext).removeKey(key);
+    }
+
+    public void clearAll(KubernetesJobAutoScalerContext jobContext) {
+        getConfigMap(jobContext).clear();
     }
 
     public void flush(KubernetesJobAutoScalerContext jobContext) {
-        Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
-        if (configMapOpt == null || configMapOpt.isEmpty()) {
-            LOG.debug("The configMap isn't updated, so skip the flush.");
+        ConfigMapView configMapView = cache.get(jobContext.getJobKey());
+        if (configMapView == null) {
+            LOG.debug("The configMap doesn't exist, so skip the flush.");
             return;
         }
         try {
-            cache.put(
-                    jobContext.getJobKey(),
-                    
Optional.of(kubernetesClient.resource(configMapOpt.get()).update()));
+            configMapView.flush();
         } catch (Exception e) {
             LOG.error(
                     "Error while updating autoscaler info configmap, 
invalidating to clear the cache",
@@ -99,49 +93,17 @@ public class ConfigMapStore {
         cache.remove(resourceID);
     }
 
-    private Optional<ConfigMap> getConfigMap(KubernetesJobAutoScalerContext 
jobContext) {
+    private ConfigMapView getConfigMap(KubernetesJobAutoScalerContext 
jobContext) {
         return cache.computeIfAbsent(
                 jobContext.getJobKey(), (id) -> 
getConfigMapFromKubernetes(jobContext));
     }
 
-    private Map<String, String> 
getOrCreateState(KubernetesJobAutoScalerContext jobContext) {
-        return cache.compute(
-                        jobContext.getJobKey(),
-                        (id, configMapOpt) -> {
-                            // If in the cache and valid simply return
-                            if (configMapOpt != null && 
configMapOpt.isPresent()) {
-                                return configMapOpt;
-                            }
-                            // Otherwise get or create
-                            return 
Optional.of(getOrCreateConfigMapFromKubernetes(jobContext));
-                        })
-                .get()
-                .getData();
-    }
-
     @VisibleForTesting
-    protected Optional<ConfigMap> getConfigMapFromKubernetes(
-            KubernetesJobAutoScalerContext jobContext) {
+    ConfigMapView getConfigMapFromKubernetes(KubernetesJobAutoScalerContext 
jobContext) {
         HasMetadata cr = jobContext.getResource();
         var meta = createCmObjectMeta(ResourceID.fromResource(cr));
-        return getScalingInfoConfigMap(meta);
-    }
-
-    @Nonnull
-    private ConfigMap getOrCreateConfigMapFromKubernetes(
-            KubernetesJobAutoScalerContext jobContext) {
-        HasMetadata cr = jobContext.getResource();
-        var meta = createCmObjectMeta(ResourceID.fromResource(cr));
-        return getScalingInfoConfigMap(meta).orElseGet(() -> 
createConfigMap(cr, meta));
-    }
-
-    private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
-        LOG.info("Creating scaling info config map");
-        var cm = new ConfigMap();
-        cm.setMetadata(meta);
-        cm.addOwnerReference(cr);
-        cm.setData(new HashMap<>());
-        return kubernetesClient.resource(cm).create();
+        var configMapSkeleton = buildConfigMap(cr, meta);
+        return new ConfigMapView(configMapSkeleton, 
kubernetesClient::resource);
     }
 
     private ObjectMeta createCmObjectMeta(ResourceID uid) {
@@ -157,17 +119,16 @@ public class ConfigMapStore {
         return objectMeta;
     }
 
-    private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) 
{
-        return Optional.ofNullable(
-                kubernetesClient
-                        .configMaps()
-                        .inNamespace(objectMeta.getNamespace())
-                        .withName(objectMeta.getName())
-                        .get());
+    private ConfigMap buildConfigMap(HasMetadata cr, ObjectMeta meta) {
+        var cm = new ConfigMap();
+        cm.setMetadata(meta);
+        cm.addOwnerReference(cr);
+        cm.setData(new HashMap<>());
+        return cm;
     }
 
     @VisibleForTesting
-    protected ConcurrentHashMap<ResourceID, Optional<ConfigMap>> getCache() {
+    protected ConcurrentHashMap<ResourceID, ConfigMapView> getCache() {
         return cache;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
new file mode 100644
index 00000000..8a6a37c6
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+class ConfigMapView {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigMapView.class);
+
+    enum State {
+        /** ConfigMap is only stored locally, not created in Kubernetes yet. */
+        NEEDS_CREATE,
+        /** ConfigMap exists in Kubernetes but there are newer local changes. 
*/
+        NEEDS_UPDATE,
+        /** ConfigMap view reflects the actual contents of Kubernetes 
ConfigMap. */
+        UP_TO_DATE
+    }
+
+    private State state;
+
+    private ConfigMap configMap;
+
+    private final Function<ConfigMap, Resource<ConfigMap>> resourceRetriever;
+
+    public ConfigMapView(
+            ConfigMap configMapSkeleton,
+            Function<ConfigMap, Resource<ConfigMap>> resourceRetriever) {
+        var existingConfigMap = 
resourceRetriever.apply(configMapSkeleton).get();
+        if (existingConfigMap != null) {
+            refreshConfigMap(existingConfigMap);
+        } else {
+            this.configMap = configMapSkeleton;
+            this.state = State.NEEDS_CREATE;
+        }
+        this.resourceRetriever = resourceRetriever;
+    }
+
+    public String get(String key) {
+        return configMap.getData().get(key);
+    }
+
+    public void put(String key, String value) {
+        configMap.getData().put(key, value);
+        requireUpdate();
+    }
+
+    public void removeKey(String key) {
+        var oldKey = configMap.getData().remove(key);
+        if (oldKey != null) {
+            requireUpdate();
+        }
+    }
+
+    public void clear() {
+        if (configMap.getData().isEmpty()) {
+            return;
+        }
+        configMap.getData().clear();
+        requireUpdate();
+    }
+
+    public void flush() {
+        if (state == State.UP_TO_DATE) {
+            return;
+        }
+        Resource<ConfigMap> resource = resourceRetriever.apply(configMap);
+        if (state == State.NEEDS_UPDATE) {
+            refreshConfigMap(resource.update());
+        } else if (state == State.NEEDS_CREATE) {
+            LOG.info("Creating config map {}", 
configMap.getMetadata().getName());
+            refreshConfigMap(resource.create());
+        }
+    }
+
+    private void refreshConfigMap(ConfigMap configMap) {
+        Preconditions.checkNotNull(configMap);
+        this.configMap = configMap;
+        this.state = State.UP_TO_DATE;
+    }
+
+    private void requireUpdate() {
+        if (state != State.NEEDS_CREATE) {
+            state = State.NEEDS_UPDATE;
+        }
+    }
+
+    @VisibleForTesting
+    public Map<String, String> getDataReadOnly() {
+        return Collections.unmodifiableMap(configMap.getData());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
similarity index 97%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 27e3083f..0e2ce1c6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.ScalingSummary;
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.configuration.ConfigurationUtils;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JacksonException;
@@ -163,6 +164,11 @@ public class KubernetesAutoScalerStateStore
         configMapStore.removeSerializedState(jobContext, 
PARALLELISM_OVERRIDES_KEY);
     }
 
+    @Override
+    public void clearAll(KubernetesJobAutoScalerContext jobContext) {
+        configMapStore.clearAll(jobContext);
+    }
+
     @Override
     public void flush(KubernetesJobAutoScalerContext jobContext) {
         trimHistoryToMaxCmSize(jobContext);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
index bed6a6ab..ccee852c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
+import 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
similarity index 51%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
index 0e42799e..9c6b8fb2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
 
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.mock.Whitebox;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
-
 import static 
org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,12 +42,21 @@ public class ConfigMapStoreTest {
 
     KubernetesMockServer mockWebServer;
 
+    ConfigMapStore configMapStore;
+
+    KubernetesJobAutoScalerContext ctx;
+
+    @BeforeEach
+    public void setup() {
+        configMapStore = new ConfigMapStore(kubernetesClient);
+        ctx = createContext("cr1", kubernetesClient);
+    }
+
     @Test
     void testCaching() {
         KubernetesJobAutoScalerContext ctx1 = createContext("cr1", 
kubernetesClient);
         KubernetesJobAutoScalerContext ctx2 = createContext("cr2", 
kubernetesClient);
 
-        var configMapStore = new ConfigMapStore(kubernetesClient);
         assertEquals(0, mockWebServer.getRequestCount());
 
         String key1 = "key1";
@@ -56,6 +66,7 @@ public class ConfigMapStoreTest {
         String key3 = "key3";
         String value3 = "value3";
         assertThat(configMapStore.getSerializedState(ctx1, key1)).isEmpty();
+        // Retrieve configMap
         assertEquals(1, mockWebServer.getRequestCount());
 
         // Further gets should not go to K8s
@@ -63,18 +74,24 @@ public class ConfigMapStoreTest {
         assertThat(configMapStore.getSerializedState(ctx1, key3)).isEmpty();
         assertEquals(1, mockWebServer.getRequestCount());
 
-        assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isEmpty();
+        // Manually trigger retrieval from Kubernetes
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isEmpty();
         assertEquals(2, mockWebServer.getRequestCount());
 
+        // Putting does not go to Kubernetes, unless flushing.
         configMapStore.putSerializedState(ctx1, key1, value1);
-        assertEquals(4, mockWebServer.getRequestCount());
+        assertEquals(2, mockWebServer.getRequestCount());
 
         // The put just update the data to cache, and shouldn't request 
kubernetes.
         configMapStore.putSerializedState(ctx1, key2, value2);
-        assertEquals(4, mockWebServer.getRequestCount());
+        assertEquals(2, mockWebServer.getRequestCount());
 
-        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isPresent();
-        assertThat(configMapStore.getConfigMapFromKubernetes(ctx2)).isEmpty();
+        // Flush!
+        configMapStore.flush(ctx1);
+        assertEquals(3, mockWebServer.getRequestCount());
+
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isNotEmpty();
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx2).getDataReadOnly()).isEmpty();
 
         assertThat(configMapStore.getSerializedState(ctx1, key1)).isPresent();
         assertThat(configMapStore.getSerializedState(ctx2, key1)).isEmpty();
@@ -97,21 +114,23 @@ public class ConfigMapStoreTest {
     void testErrorHandling() {
         KubernetesJobAutoScalerContext ctx = createContext("cr1", 
kubernetesClient);
 
-        var configMapStore = new ConfigMapStore(kubernetesClient);
         // Test for the invalid flush.
         configMapStore.flush(ctx);
 
         assertEquals(0, mockWebServer.getRequestCount());
 
         configMapStore.putSerializedState(ctx, "a", "1");
-        Optional<ConfigMap> configMapOpt = 
configMapStore.getCache().get(ctx.getJobKey());
-        assertThat(configMapOpt).isPresent();
-        assertEquals(2, mockWebServer.getRequestCount());
+        ConfigMap configMap =
+                (ConfigMap)
+                        Whitebox.getInternalState(
+                                
configMapStore.getCache().get(ctx.getJobKey()), "configMap");
+        assertThat(configMap.getData()).isNotEmpty();
+        assertEquals(1, mockWebServer.getRequestCount());
 
         // Modify the autoscaler info in the background
-        var cm = ReconciliationUtils.clone(configMapOpt.get());
+        var cm = ReconciliationUtils.clone(configMap);
         cm.getData().put("a", "2");
-        kubernetesClient.resource(cm).update();
+        kubernetesClient.resource(cm).create();
 
         // Replace should throw an error due to the modification
         assertThrows(KubernetesClientException.class, () -> 
configMapStore.flush(ctx));
@@ -120,4 +139,81 @@ public class ConfigMapStoreTest {
         // Make sure we can get the new version
         assertThat(configMapStore.getSerializedState(ctx, "a")).contains("2");
     }
+
+    @Test
+    void testMinimalAmountOfFlushing() {
+        KubernetesJobAutoScalerContext ctx = createContext("cr1", 
kubernetesClient);
+        var key = "key";
+        var value = "value";
+
+        configMapStore.getSerializedState(ctx, key);
+        assertEquals(1, mockWebServer.getRequestCount());
+
+        configMapStore.putSerializedState(ctx, key, value);
+        assertEquals(1, mockWebServer.getRequestCount());
+
+        configMapStore.flush(ctx);
+        assertEquals(2, mockWebServer.getRequestCount());
+
+        // Get from cache
+        assertThat(configMapStore.getSerializedState(ctx, 
key)).hasValue(value);
+        assertEquals(2, mockWebServer.getRequestCount());
+
+        configMapStore.removeSerializedState(ctx, key);
+        assertEquals(2, mockWebServer.getRequestCount());
+
+        configMapStore.flush(ctx);
+        assertEquals(3, mockWebServer.getRequestCount());
+
+        // Subsequent flushes do not trigger an API call
+        configMapStore.flush(ctx);
+        assertEquals(3, mockWebServer.getRequestCount());
+    }
+
+    @Test
+    public void testDiscardAllState() {
+        configMapStore.putSerializedState(
+                ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY, 
"state1");
+        configMapStore.putSerializedState(
+                ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY, 
"state2");
+        configMapStore.putSerializedState(
+                ctx, KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY, 
"state3");
+
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
+                .isPresent();
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
+                .isPresent();
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY))
+                .isPresent();
+
+        configMapStore.flush(ctx);
+
+        configMapStore.clearAll(ctx);
+
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
+                .isEmpty();
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
+                .isEmpty();
+        assertThat(
+                        configMapStore.getSerializedState(
+                                ctx, 
KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY))
+                .isEmpty();
+
+        // We haven't flushed the clear operation, ConfigMap in Kubernetes 
should not be empty
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();
+
+        configMapStore.flush(ctx);
+
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty();
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
similarity index 87%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
index f4faaa55..561e5448 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
 
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -43,9 +44,9 @@ import java.util.TreeMap;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeScalingHistory;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeScalingHistory;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -330,4 +331,40 @@ public class KubernetesAutoScalerStateStoreTest {
                                 ctx, 
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
                 .isEmpty();
     }
+
+    @Test
+    public void testDiscardAllState() {
+        stateStore.storeCollectedMetrics(
+                ctx, new TreeMap<>(Map.of(Instant.now(), new 
CollectedMetrics())));
+        stateStore.storeScalingHistory(
+                ctx,
+                Map.of(
+                        new JobVertexID(),
+                        new TreeMap<>(Map.of(Instant.now(), new 
ScalingSummary()))));
+        stateStore.storeParallelismOverrides(ctx, Map.of(new 
JobVertexID().toHexString(), "23"));
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+        stateStore.flush(ctx);
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+        stateStore.clearAll(ctx);
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+
+        // We haven't flushed the clear operation, ConfigMap in Kubernetes 
should not be empty
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();
+
+        stateStore.flush(ctx);
+
+        // Contents should be removed from Kubernetes
+        
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty();
+    }
 }

Reply via email to