This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 02840b96 [FLINK-33527] Simplify state store cleanup logic (#710) 02840b96 is described below commit 02840b96ef3116ea95a440af4f945398900d89df Author: Maximilian Michels <m...@apache.org> AuthorDate: Wed Nov 15 15:16:41 2023 +0100 [FLINK-33527] Simplify state store cleanup logic (#710) --- .../apache/flink/autoscaler/JobAutoScalerImpl.java | 38 +------ .../autoscaler/state/AutoScalerStateStore.java | 9 +- .../state/InMemoryAutoScalerStateStore.java | 7 ++ .../flink/autoscaler/BacklogBasedScalingTest.java | 9 +- .../flink/autoscaler/JobAutoScalerImplTest.java | 71 +++--------- .../autoscaler/RecommendedParallelismTest.java | 9 +- .../state/TestingAutoscalerStateStore.java | 37 ------ .../operator/autoscaler/AutoscalerFactory.java | 2 + .../autoscaler/{ => state}/ConfigMapStore.java | 99 +++++----------- .../operator/autoscaler/state/ConfigMapView.java | 117 +++++++++++++++++++ .../KubernetesAutoScalerStateStore.java | 8 +- .../KubernetesAutoScalerEventHandlerTest.java | 2 + .../autoscaler/{ => state}/ConfigMapStoreTest.java | 126 ++++++++++++++++++--- .../KubernetesAutoScalerStateStoreTest.java | 43 ++++++- 14 files changed, 351 insertions(+), 226 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 0e2afb55..1cfdba62 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; -import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.trimScalingHistory; /** The default implementation of {@link JobAutoScaler}. */ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> @@ -90,7 +89,8 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> try { if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) { LOG.debug("Autoscaler is disabled"); - clearStatesAfterAutoscalerDisabled(ctx); + stateStore.clearAll(ctx); + stateStore.flush(ctx); return; } @@ -120,40 +120,6 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> stateStore.removeInfoFromCache(jobKey); } - private void clearStatesAfterAutoscalerDisabled(Context ctx) throws Exception { - var needFlush = false; - var parallelismOverrides = stateStore.getParallelismOverrides(ctx); - if (!parallelismOverrides.isEmpty()) { - needFlush = true; - stateStore.removeParallelismOverrides(ctx); - } - - var collectedMetrics = stateStore.getCollectedMetrics(ctx); - if (!collectedMetrics.isEmpty()) { - needFlush = true; - stateStore.removeCollectedMetrics(ctx); - } - - var scalingHistory = stateStore.getScalingHistory(ctx); - if (!scalingHistory.isEmpty()) { - var trimmedScalingHistory = - trimScalingHistory(clock.instant(), ctx.getConfiguration(), scalingHistory); - if (trimmedScalingHistory.isEmpty()) { - // All scaling histories are trimmed. - needFlush = true; - stateStore.removeScalingHistory(ctx); - } else if (!scalingHistory.equals(trimmedScalingHistory)) { - // Some scaling histories are trimmed. - needFlush = true; - stateStore.storeScalingHistory(ctx, trimmedScalingHistory); - } - } - - if (needFlush) { - stateStore.flush(ctx); - } - } - @VisibleForTesting protected Map<String, String> getParallelismOverrides(Context ctx) throws Exception { return stateStore.getParallelismOverrides(ctx); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java index bb3329a4..6f9e54e2 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -64,10 +64,13 @@ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext< void removeParallelismOverrides(Context jobContext) throws Exception; + /** Removes all data from this context. Flush stil needs to be called. */ + void clearAll(Context jobContext) throws Exception; + /** - * Flushing is needed because we just save data in cache for all store methods. For less write - * operations, we flush the cached data to the physical storage only after all operations have - * been performed. + * Flushing is needed because we do not persist data for all store methods until this method is + * called. Note: The state store implementation should try to avoid write operations unless data + * was changed through this interface. */ void flush(Context jobContext) throws Exception; diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java index 29176365..caf4e4fd 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java @@ -105,6 +105,13 @@ public class InMemoryAutoScalerStateStore<KEY, Context extends JobAutoScalerCont parallelismOverridesStore.remove(jobContext.getJobKey()); } + @Override + public void clearAll(Context jobContext) { + scalingHistoryStore.remove(jobContext.getJobKey()); + parallelismOverridesStore.remove(jobContext.getJobKey()); + collectedMetricsStore.remove(jobContext.getJobKey()); + } + @Override public void flush(Context jobContext) { // The InMemory state store doesn't persist data. diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index 810bffe3..f3198188 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -24,7 +24,8 @@ import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; -import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -54,7 +55,7 @@ public class BacklogBasedScalingTest { private JobAutoScalerContext<JobID> context; private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector; - private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; + private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> metricsCollector; private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingExecutor; @@ -68,7 +69,7 @@ public class BacklogBasedScalingTest { context = createDefaultJobAutoScalerContext(); eventCollector = new TestingEventCollector<>(); - stateStore = new TestingAutoscalerStateStore<>(); + stateStore = new InMemoryAutoScalerStateStore<>(); scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore); @@ -395,7 +396,7 @@ public class BacklogBasedScalingTest { assertTrue(eventCollector.events.isEmpty()); } - private void assertEvaluatedMetricsSize(int expectedSize) { + private void assertEvaluatedMetricsSize(int expectedSize) throws Exception { SortedMap<Instant, CollectedMetrics> evaluatedMetrics = stateStore.getCollectedMetrics(context); assertThat(evaluatedMetrics).hasSize(expectedSize); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index 7f507dd4..a553c1b7 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -26,7 +26,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; -import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.client.program.rest.RestClusterClient; @@ -43,16 +44,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.time.ZoneId; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; @@ -61,6 +59,8 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABL import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for JobAutoScalerImpl. */ public class JobAutoScalerImplTest { @@ -68,7 +68,7 @@ public class JobAutoScalerImplTest { private JobAutoScalerContext<JobID> context; private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer; private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector; - private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; + private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; @BeforeEach public void setup() { @@ -77,7 +77,7 @@ public class JobAutoScalerImplTest { scalingRealizer = new TestingScalingRealizer<>(); eventCollector = new TestingEventCollector<>(); - stateStore = new TestingAutoscalerStateStore<>(); + stateStore = new InMemoryAutoScalerStateStore<>(); } @Test @@ -209,10 +209,8 @@ public class JobAutoScalerImplTest { assertThat(autoscaler.getParallelismOverrides(context)).isEmpty(); assertParallelismOverrides(null); - int requestCount = stateStore.getFlushCount(); // Make sure we don't update in kubernetes once removed autoscaler.scale(context); - assertEquals(requestCount, stateStore.getFlushCount()); context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true"); autoscaler.applyParallelismOverrides(context); @@ -299,60 +297,25 @@ public class JobAutoScalerImplTest { scalingHistory.put(Instant.ofEpochMilli(100), new ScalingSummary()); scalingHistory.put(Instant.ofEpochMilli(200), new ScalingSummary()); - // Test all scaling aren't expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(250), ZoneId.systemDefault()), 2); + stateStore.storeScalingHistory(context, Map.of(new JobVertexID(), scalingHistory)); + assertFalse(stateStore.getScalingHistory(context).isEmpty()); - // Test one scaling aren't expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(350), ZoneId.systemDefault()), 1); + stateStore.storeParallelismOverrides(context, Map.of("vertex", "4")); + assertFalse(stateStore.getParallelismOverrides(context).isEmpty()); - // Test all scaling are expired - getInstantScalingSummaryTreeMap( - scalingHistory, Clock.fixed(Instant.ofEpochMilli(450), ZoneId.systemDefault()), 0); - } + TreeMap<Instant, CollectedMetrics> metrics = new TreeMap<>(); + metrics.put(Instant.now(), new CollectedMetrics()); + stateStore.storeCollectedMetrics(context, metrics); + assertFalse(stateStore.getCollectedMetrics(context).isEmpty()); - private void getInstantScalingSummaryTreeMap( - SortedMap<Instant, ScalingSummary> scalingHistoryData, - Clock clock, - int expectedScalingHistorySize) - throws Exception { - stateStore = new TestingAutoscalerStateStore<>(); var autoscaler = new JobAutoScalerImpl<>( null, null, null, eventCollector, scalingRealizer, stateStore); - - enrichStateStore(scalingHistoryData); - stateStore.flush(context); - assertThat(stateStore.getFlushCount()).isEqualTo(1); - - autoscaler.setClock(clock); autoscaler.scale(context); - assertThat(stateStore.getParallelismOverrides(context)).isEmpty(); - assertThat(stateStore.getCollectedMetrics(context)).isEmpty(); - - if (expectedScalingHistorySize > 0) { - Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory = - stateStore.getScalingHistory(context); - assertThat(scalingHistory).isNotEmpty(); - assertThat(scalingHistory.values()) - .allMatch(aa -> aa.size() == expectedScalingHistorySize); - } else { - assertThat(stateStore.getScalingHistory(context)).isEmpty(); - } - assertThat(stateStore.getFlushCount()).isEqualTo(2); - } - - private void enrichStateStore(SortedMap<Instant, ScalingSummary> scalingHistory) { - var v1 = new JobVertexID(); - var v2 = new JobVertexID(); - stateStore.storeParallelismOverrides( - context, Map.of(v1.toString(), "1", v2.toString(), "2")); - - var metricHistory = new TreeMap<Instant, CollectedMetrics>(); - stateStore.storeCollectedMetrics(context, metricHistory); - stateStore.storeScalingHistory(context, Map.of(v1, scalingHistory, v2, scalingHistory)); + assertTrue(stateStore.getScalingHistory(context).isEmpty()); + assertTrue(stateStore.getScalingHistory(context).isEmpty()); + assertTrue(stateStore.getParallelismOverrides(context).isEmpty()); } private void assertParallelismOverrides(Map<String, String> expectedOverrides) { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index 1884268e..87025b93 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -25,7 +25,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; -import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -55,7 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class RecommendedParallelismTest { private JobAutoScalerContext<JobID> context; - private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; + private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> metricsCollector; private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingExecutor; @@ -70,7 +71,7 @@ public class RecommendedParallelismTest { TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector = new TestingEventCollector<>(); - stateStore = new TestingAutoscalerStateStore<>(); + stateStore = new InMemoryAutoScalerStateStore<>(); scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore); @@ -227,7 +228,7 @@ public class RecommendedParallelismTest { assertEquals(4, scaledParallelism.get(sink)); } - private void assertEvaluatedMetricsSize(int expectedSize) { + private void assertEvaluatedMetricsSize(int expectedSize) throws Exception { SortedMap<Instant, CollectedMetrics> evaluatedMetrics = stateStore.getCollectedMetrics(context); assertThat(evaluatedMetrics).hasSize(expectedSize); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java deleted file mode 100644 index 3b3fafa1..00000000 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.autoscaler.state; - -import org.apache.flink.autoscaler.JobAutoScalerContext; - -/** Testing {@link AutoScalerStateStore} implementation. */ -public class TestingAutoscalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> - extends InMemoryAutoScalerStateStore<KEY, Context> { - - private int flushCount; - - @Override - public void flush(Context jobContext) { - super.flush(jobContext); - flushCount++; - } - - public int getFlushCount() { - return flushCount; - } -} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java index 97e70267..dea6df0b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java @@ -22,6 +22,8 @@ import org.apache.flink.autoscaler.JobAutoScalerImpl; import org.apache.flink.autoscaler.RestApiMetricsCollector; import org.apache.flink.autoscaler.ScalingExecutor; import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; +import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import io.fabric8.kubernetes.client.KubernetesClient; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java similarity index 52% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java rename to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java index 5ded9719..b840a292 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.kubernetes.operator.autoscaler.state; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.kubernetes.utils.Constants; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -28,8 +29,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -44,13 +43,12 @@ public class ConfigMapStore { private final KubernetesClient kubernetesClient; - // The cache for each resourceId may be in three states: - // 1. The resourceId doesn't exist : ConfigMap isn't loaded from kubernetes, or it's deleted - // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes - // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it may not be the same - // if not flushed already - private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache = - new ConcurrentHashMap<>(); + /** + * Cache for Kubernetes ConfigMaps which reflects the latest state of a ConfigMap for a + * ResourceId. Any changes to the ConfigMap are only reflected in Kubernetes once the flush() + * method is called. + */ + private final ConcurrentHashMap<ResourceID, ConfigMapView> cache = new ConcurrentHashMap<>(); public ConfigMapStore(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; @@ -58,34 +56,30 @@ public class ConfigMapStore { protected void putSerializedState( KubernetesJobAutoScalerContext jobContext, String key, String value) { - getOrCreateState(jobContext).put(key, value); + getConfigMap(jobContext).put(key, value); } protected Optional<String> getSerializedState( KubernetesJobAutoScalerContext jobContext, String key) { - return getConfigMap(jobContext).map(configMap -> configMap.getData().get(key)); + return Optional.ofNullable(getConfigMap(jobContext).get(key)); } protected void removeSerializedState(KubernetesJobAutoScalerContext jobContext, String key) { - getConfigMap(jobContext) - .ifPresentOrElse( - configMap -> configMap.getData().remove(key), - () -> { - throw new IllegalStateException( - "The configMap isn't created, so the remove is unavailable."); - }); + getConfigMap(jobContext).removeKey(key); + } + + public void clearAll(KubernetesJobAutoScalerContext jobContext) { + getConfigMap(jobContext).clear(); } public void flush(KubernetesJobAutoScalerContext jobContext) { - Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey()); - if (configMapOpt == null || configMapOpt.isEmpty()) { - LOG.debug("The configMap isn't updated, so skip the flush."); + ConfigMapView configMapView = cache.get(jobContext.getJobKey()); + if (configMapView == null) { + LOG.debug("The configMap doesn't exist, so skip the flush."); return; } try { - cache.put( - jobContext.getJobKey(), - Optional.of(kubernetesClient.resource(configMapOpt.get()).update())); + configMapView.flush(); } catch (Exception e) { LOG.error( "Error while updating autoscaler info configmap, invalidating to clear the cache", @@ -99,49 +93,17 @@ public class ConfigMapStore { cache.remove(resourceID); } - private Optional<ConfigMap> getConfigMap(KubernetesJobAutoScalerContext jobContext) { + private ConfigMapView getConfigMap(KubernetesJobAutoScalerContext jobContext) { return cache.computeIfAbsent( jobContext.getJobKey(), (id) -> getConfigMapFromKubernetes(jobContext)); } - private Map<String, String> getOrCreateState(KubernetesJobAutoScalerContext jobContext) { - return cache.compute( - jobContext.getJobKey(), - (id, configMapOpt) -> { - // If in the cache and valid simply return - if (configMapOpt != null && configMapOpt.isPresent()) { - return configMapOpt; - } - // Otherwise get or create - return Optional.of(getOrCreateConfigMapFromKubernetes(jobContext)); - }) - .get() - .getData(); - } - @VisibleForTesting - protected Optional<ConfigMap> getConfigMapFromKubernetes( - KubernetesJobAutoScalerContext jobContext) { + ConfigMapView getConfigMapFromKubernetes(KubernetesJobAutoScalerContext jobContext) { HasMetadata cr = jobContext.getResource(); var meta = createCmObjectMeta(ResourceID.fromResource(cr)); - return getScalingInfoConfigMap(meta); - } - - @Nonnull - private ConfigMap getOrCreateConfigMapFromKubernetes( - KubernetesJobAutoScalerContext jobContext) { - HasMetadata cr = jobContext.getResource(); - var meta = createCmObjectMeta(ResourceID.fromResource(cr)); - return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta)); - } - - private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) { - LOG.info("Creating scaling info config map"); - var cm = new ConfigMap(); - cm.setMetadata(meta); - cm.addOwnerReference(cr); - cm.setData(new HashMap<>()); - return kubernetesClient.resource(cm).create(); + var configMapSkeleton = buildConfigMap(cr, meta); + return new ConfigMapView(configMapSkeleton, kubernetesClient::resource); } private ObjectMeta createCmObjectMeta(ResourceID uid) { @@ -157,17 +119,16 @@ public class ConfigMapStore { return objectMeta; } - private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) { - return Optional.ofNullable( - kubernetesClient - .configMaps() - .inNamespace(objectMeta.getNamespace()) - .withName(objectMeta.getName()) - .get()); + private ConfigMap buildConfigMap(HasMetadata cr, ObjectMeta meta) { + var cm = new ConfigMap(); + cm.setMetadata(meta); + cm.addOwnerReference(cr); + cm.setData(new HashMap<>()); + return cm; } @VisibleForTesting - protected ConcurrentHashMap<ResourceID, Optional<ConfigMap>> getCache() { + protected ConcurrentHashMap<ResourceID, ConfigMapView> getCache() { return cache; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java new file mode 100644 index 00000000..8a6a37c6 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.dsl.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +class ConfigMapView { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigMapView.class); + + enum State { + /** ConfigMap is only stored locally, not created in Kubernetes yet. */ + NEEDS_CREATE, + /** ConfigMap exists in Kubernetes but there are newer local changes. */ + NEEDS_UPDATE, + /** ConfigMap view reflects the actual contents of Kubernetes ConfigMap. */ + UP_TO_DATE + } + + private State state; + + private ConfigMap configMap; + + private final Function<ConfigMap, Resource<ConfigMap>> resourceRetriever; + + public ConfigMapView( + ConfigMap configMapSkeleton, + Function<ConfigMap, Resource<ConfigMap>> resourceRetriever) { + var existingConfigMap = resourceRetriever.apply(configMapSkeleton).get(); + if (existingConfigMap != null) { + refreshConfigMap(existingConfigMap); + } else { + this.configMap = configMapSkeleton; + this.state = State.NEEDS_CREATE; + } + this.resourceRetriever = resourceRetriever; + } + + public String get(String key) { + return configMap.getData().get(key); + } + + public void put(String key, String value) { + configMap.getData().put(key, value); + requireUpdate(); + } + + public void removeKey(String key) { + var oldKey = configMap.getData().remove(key); + if (oldKey != null) { + requireUpdate(); + } + } + + public void clear() { + if (configMap.getData().isEmpty()) { + return; + } + configMap.getData().clear(); + requireUpdate(); + } + + public void flush() { + if (state == State.UP_TO_DATE) { + return; + } + Resource<ConfigMap> resource = resourceRetriever.apply(configMap); + if (state == State.NEEDS_UPDATE) { + refreshConfigMap(resource.update()); + } else if (state == State.NEEDS_CREATE) { + LOG.info("Creating config map {}", configMap.getMetadata().getName()); + refreshConfigMap(resource.create()); + } + } + + private void refreshConfigMap(ConfigMap configMap) { + Preconditions.checkNotNull(configMap); + this.configMap = configMap; + this.state = State.UP_TO_DATE; + } + + private void requireUpdate() { + if (state != State.NEEDS_CREATE) { + state = State.NEEDS_UPDATE; + } + } + + @VisibleForTesting + public Map<String, String> getDataReadOnly() { + return Collections.unmodifiableMap(configMap.getData()); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java similarity index 97% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java rename to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index 27e3083f..0e2ce1c6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.kubernetes.operator.autoscaler.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.autoscaler.ScalingSummary; @@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JacksonException; @@ -163,6 +164,11 @@ public class KubernetesAutoScalerStateStore configMapStore.removeSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY); } + @Override + public void clearAll(KubernetesJobAutoScalerContext jobContext) { + configMapStore.clearAll(jobContext); + } + @Override public void flush(KubernetesJobAutoScalerContext jobContext) { trimHistoryToMaxCmSize(jobContext); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java index bed6a6ab..ccee852c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java @@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; +import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java similarity index 51% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java index 0e42799e..9c6b8fb2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java @@ -15,19 +15,20 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.kubernetes.operator.autoscaler.state; +import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.mock.Whitebox; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Optional; - import static org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -41,12 +42,21 @@ public class ConfigMapStoreTest { KubernetesMockServer mockWebServer; + ConfigMapStore configMapStore; + + KubernetesJobAutoScalerContext ctx; + + @BeforeEach + public void setup() { + configMapStore = new ConfigMapStore(kubernetesClient); + ctx = createContext("cr1", kubernetesClient); + } + @Test void testCaching() { KubernetesJobAutoScalerContext ctx1 = createContext("cr1", kubernetesClient); KubernetesJobAutoScalerContext ctx2 = createContext("cr2", kubernetesClient); - var configMapStore = new ConfigMapStore(kubernetesClient); assertEquals(0, mockWebServer.getRequestCount()); String key1 = "key1"; @@ -56,6 +66,7 @@ public class ConfigMapStoreTest { String key3 = "key3"; String value3 = "value3"; assertThat(configMapStore.getSerializedState(ctx1, key1)).isEmpty(); + // Retrieve configMap assertEquals(1, mockWebServer.getRequestCount()); // Further gets should not go to K8s @@ -63,18 +74,24 @@ public class ConfigMapStoreTest { assertThat(configMapStore.getSerializedState(ctx1, key3)).isEmpty(); assertEquals(1, mockWebServer.getRequestCount()); - assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isEmpty(); + // Manually trigger retrieval from Kubernetes + assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isEmpty(); assertEquals(2, mockWebServer.getRequestCount()); + // Putting does not go to Kubernetes, unless flushing. configMapStore.putSerializedState(ctx1, key1, value1); - assertEquals(4, mockWebServer.getRequestCount()); + assertEquals(2, mockWebServer.getRequestCount()); // The put just update the data to cache, and shouldn't request kubernetes. configMapStore.putSerializedState(ctx1, key2, value2); - assertEquals(4, mockWebServer.getRequestCount()); + assertEquals(2, mockWebServer.getRequestCount()); - assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isPresent(); - assertThat(configMapStore.getConfigMapFromKubernetes(ctx2)).isEmpty(); + // Flush! + configMapStore.flush(ctx1); + assertEquals(3, mockWebServer.getRequestCount()); + + assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isNotEmpty(); + assertThat(configMapStore.getConfigMapFromKubernetes(ctx2).getDataReadOnly()).isEmpty(); assertThat(configMapStore.getSerializedState(ctx1, key1)).isPresent(); assertThat(configMapStore.getSerializedState(ctx2, key1)).isEmpty(); @@ -97,21 +114,23 @@ public class ConfigMapStoreTest { void testErrorHandling() { KubernetesJobAutoScalerContext ctx = createContext("cr1", kubernetesClient); - var configMapStore = new ConfigMapStore(kubernetesClient); // Test for the invalid flush. configMapStore.flush(ctx); assertEquals(0, mockWebServer.getRequestCount()); configMapStore.putSerializedState(ctx, "a", "1"); - Optional<ConfigMap> configMapOpt = configMapStore.getCache().get(ctx.getJobKey()); - assertThat(configMapOpt).isPresent(); - assertEquals(2, mockWebServer.getRequestCount()); + ConfigMap configMap = + (ConfigMap) + Whitebox.getInternalState( + configMapStore.getCache().get(ctx.getJobKey()), "configMap"); + assertThat(configMap.getData()).isNotEmpty(); + assertEquals(1, mockWebServer.getRequestCount()); // Modify the autoscaler info in the background - var cm = ReconciliationUtils.clone(configMapOpt.get()); + var cm = ReconciliationUtils.clone(configMap); cm.getData().put("a", "2"); - kubernetesClient.resource(cm).update(); + kubernetesClient.resource(cm).create(); // Replace should throw an error due to the modification assertThrows(KubernetesClientException.class, () -> configMapStore.flush(ctx)); @@ -120,4 +139,81 @@ public class ConfigMapStoreTest { // Make sure we can get the new version assertThat(configMapStore.getSerializedState(ctx, "a")).contains("2"); } + + @Test + void testMinimalAmountOfFlushing() { + KubernetesJobAutoScalerContext ctx = createContext("cr1", kubernetesClient); + var key = "key"; + var value = "value"; + + configMapStore.getSerializedState(ctx, key); + assertEquals(1, mockWebServer.getRequestCount()); + + configMapStore.putSerializedState(ctx, key, value); + assertEquals(1, mockWebServer.getRequestCount()); + + configMapStore.flush(ctx); + assertEquals(2, mockWebServer.getRequestCount()); + + // Get from cache + assertThat(configMapStore.getSerializedState(ctx, key)).hasValue(value); + assertEquals(2, mockWebServer.getRequestCount()); + + configMapStore.removeSerializedState(ctx, key); + assertEquals(2, mockWebServer.getRequestCount()); + + configMapStore.flush(ctx); + assertEquals(3, mockWebServer.getRequestCount()); + + // Subsequent flushes do not trigger an API call + configMapStore.flush(ctx); + assertEquals(3, mockWebServer.getRequestCount()); + } + + @Test + public void testDiscardAllState() { + configMapStore.putSerializedState( + ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY, "state1"); + configMapStore.putSerializedState( + ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY, "state2"); + configMapStore.putSerializedState( + ctx, KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY, "state3"); + + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY)) + .isPresent(); + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY)) + .isPresent(); + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY)) + .isPresent(); + + configMapStore.flush(ctx); + + configMapStore.clearAll(ctx); + + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY)) + .isEmpty(); + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY)) + .isEmpty(); + assertThat( + configMapStore.getSerializedState( + ctx, KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY)) + .isEmpty(); + + // We haven't flushed the clear operation, ConfigMap in Kubernetes should not be empty + assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty(); + + configMapStore.flush(ctx); + + assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty(); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java similarity index 87% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java index f4faaa55..561e5448 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.kubernetes.operator.autoscaler.state; import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; @@ -43,9 +44,9 @@ import java.util.TreeMap; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList; -import static org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics; -import static org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeScalingHistory; import static org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext; +import static org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics; +import static org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeScalingHistory; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -330,4 +331,40 @@ public class KubernetesAutoScalerStateStoreTest { ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY)) .isEmpty(); } + + @Test + public void testDiscardAllState() { + stateStore.storeCollectedMetrics( + ctx, new TreeMap<>(Map.of(Instant.now(), new CollectedMetrics()))); + stateStore.storeScalingHistory( + ctx, + Map.of( + new JobVertexID(), + new TreeMap<>(Map.of(Instant.now(), new ScalingSummary())))); + stateStore.storeParallelismOverrides(ctx, Map.of(new JobVertexID().toHexString(), "23")); + + assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); + + stateStore.flush(ctx); + + assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); + + stateStore.clearAll(ctx); + + assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); + + // We haven't flushed the clear operation, ConfigMap in Kubernetes should not be empty + assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty(); + + stateStore.flush(ctx); + + // Contents should be removed from Kubernetes + assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty(); + } }