This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a6d4ce7 [FLINK-33097] Cleanup use of Optional for new interfaces 
(#709)
6a6d4ce7 is described below

commit 6a6d4ce7609d35ccc994b1dd20afd2c8ef5835fe
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Mon Nov 13 05:29:55 2023 -0800

    [FLINK-33097] Cleanup use of Optional for new interfaces (#709)
    
    The new interfaces make use of Optional in many places where returning the
    object directly would result in less and more readable code.
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java | 21 +++++++--------
 .../flink/autoscaler/ScalingMetricCollector.java   |  3 +--
 .../autoscaler/metrics/ScalingHistoryUtils.java    |  6 +----
 .../autoscaler/state/AutoScalerStateStore.java     | 15 ++++++-----
 .../state/InMemoryAutoScalerStateStore.java        | 17 +++++++-----
 .../flink/autoscaler/BacklogBasedScalingTest.java  |  8 +++---
 .../flink/autoscaler/JobAutoScalerImplTest.java    | 21 +++++++--------
 .../MetricsCollectionAndEvaluationTest.java        | 12 ++++-----
 .../autoscaler/RecommendedParallelismTest.java     |  8 +++---
 .../flink/autoscaler/ScalingExecutorTest.java      | 15 ++++-------
 .../autoscaler/KubernetesAutoScalerStateStore.java | 31 ++++++++++------------
 .../KubernetesAutoScalerStateStoreTest.java        |  4 +--
 12 files changed, 74 insertions(+), 87 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 663a8669..0e2afb55 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Clock;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
@@ -123,21 +122,20 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
 
     private void clearStatesAfterAutoscalerDisabled(Context ctx) throws 
Exception {
         var needFlush = false;
-        var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx);
-        if (parallelismOverridesOpt.isPresent()) {
+        var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
+        if (!parallelismOverrides.isEmpty()) {
             needFlush = true;
             stateStore.removeParallelismOverrides(ctx);
         }
 
-        var collectedMetricsOpt = stateStore.getCollectedMetrics(ctx);
-        if (collectedMetricsOpt.isPresent()) {
+        var collectedMetrics = stateStore.getCollectedMetrics(ctx);
+        if (!collectedMetrics.isEmpty()) {
             needFlush = true;
             stateStore.removeCollectedMetrics(ctx);
         }
 
-        var scalingHistoryOpt = stateStore.getScalingHistory(ctx);
-        if (scalingHistoryOpt.isPresent()) {
-            var scalingHistory = scalingHistoryOpt.get();
+        var scalingHistory = stateStore.getScalingHistory(ctx);
+        if (!scalingHistory.isEmpty()) {
             var trimmedScalingHistory =
                     trimScalingHistory(clock.instant(), 
ctx.getConfiguration(), scalingHistory);
             if (trimmedScalingHistory.isEmpty()) {
@@ -157,7 +155,7 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
     }
 
     @VisibleForTesting
-    protected Optional<Map<String, String>> getParallelismOverrides(Context 
ctx) throws Exception {
+    protected Map<String, String> getParallelismOverrides(Context ctx) throws 
Exception {
         return stateStore.getParallelismOverrides(ctx);
     }
 
@@ -169,11 +167,10 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
      */
     @VisibleForTesting
     protected void applyParallelismOverrides(Context ctx) throws Exception {
-        var overridesOpt = getParallelismOverrides(ctx);
-        if (overridesOpt.isEmpty() || overridesOpt.get().isEmpty()) {
+        var overrides = getParallelismOverrides(ctx);
+        if (overrides.isEmpty()) {
             return;
         }
-        Map<String, String> overrides = overridesOpt.get();
         LOG.debug("Applying parallelism overrides: {}", overrides);
 
         var conf = ctx.getConfiguration();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 51f5931b..4e891f78 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -58,7 +58,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -94,7 +93,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                         jobKey,
                         (k) -> {
                             try {
-                                return 
stateStore.getCollectedMetrics(ctx).orElse(new TreeMap<>());
+                                return stateStore.getCollectedMetrics(ctx);
                             } catch (Exception exception) {
                                 throw new RuntimeException(
                                         "Get evaluated metrics failed.", 
exception);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java
index 91aedff4..97736d1d 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import javax.annotation.Nonnull;
 
 import java.time.Instant;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -86,10 +85,7 @@ public class ScalingHistoryUtils {
                     Instant now)
                     throws Exception {
         var conf = context.getConfiguration();
-        return autoScalerStateStore
-                .getScalingHistory(context)
-                .map(scalingHistory -> trimScalingHistory(now, conf, 
scalingHistory))
-                .orElse(new HashMap<>());
+        return trimScalingHistory(now, conf, 
autoScalerStateStore.getScalingHistory(context));
     }
 
     public static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
trimScalingHistory(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index b0255506..bb3329a4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -23,9 +23,10 @@ import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.Map;
-import java.util.Optional;
 import java.util.SortedMap;
 
 /**
@@ -41,23 +42,25 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
             Context jobContext, Map<JobVertexID, SortedMap<Instant, 
ScalingSummary>> scalingHistory)
             throws Exception;
 
-    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
getScalingHistory(
-            Context jobContext) throws Exception;
+    @Nonnull
+    Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(Context jobContext)
+            throws Exception;
 
     void removeScalingHistory(Context jobContext) throws Exception;
 
     void storeCollectedMetrics(Context jobContext, SortedMap<Instant, 
CollectedMetrics> metrics)
             throws Exception;
 
-    Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(Context 
jobContext)
-            throws Exception;
+    @Nonnull
+    SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context 
jobContext) throws Exception;
 
     void removeCollectedMetrics(Context jobContext) throws Exception;
 
     void storeParallelismOverrides(Context jobContext, Map<String, String> 
parallelismOverrides)
             throws Exception;
 
-    Optional<Map<String, String>> getParallelismOverrides(Context jobContext) 
throws Exception;
+    @Nonnull
+    Map<String, String> getParallelismOverrides(Context jobContext) throws 
Exception;
 
     void removeParallelismOverrides(Context jobContext) throws Exception;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 1cb74d44..29176365 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -23,9 +23,11 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -58,9 +60,10 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     }
 
     @Override
-    public Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
getScalingHistory(
+    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(
             Context jobContext) {
-        return 
Optional.ofNullable(scalingHistoryStore.get(jobContext.getJobKey()));
+        return 
Optional.ofNullable(scalingHistoryStore.get(jobContext.getJobKey()))
+                .orElse(new HashMap<>());
     }
 
     @Override
@@ -75,8 +78,9 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     }
 
     @Override
-    public Optional<SortedMap<Instant, CollectedMetrics>> 
getCollectedMetrics(Context jobContext) {
-        return 
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()));
+    public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context 
jobContext) {
+        return 
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()))
+                .orElse(new TreeMap<>());
     }
 
     @Override
@@ -91,8 +95,9 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     }
 
     @Override
-    public Optional<Map<String, String>> getParallelismOverrides(Context 
jobContext) {
-        return 
Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey()));
+    public Map<String, String> getParallelismOverrides(Context jobContext) {
+        return 
Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey()))
+                .orElse(new HashMap<>());
     }
 
     @Override
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 3e5c6fa1..810bffe3 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -39,7 +39,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 
@@ -368,7 +367,7 @@ public class BacklogBasedScalingTest {
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
 
         autoscaler.scale(context);
-        assertFalse(stateStore.getCollectedMetrics(context).get().isEmpty());
+        assertFalse(stateStore.getCollectedMetrics(context).isEmpty());
     }
 
     @Test
@@ -397,10 +396,9 @@ public class BacklogBasedScalingTest {
     }
 
     private void assertEvaluatedMetricsSize(int expectedSize) {
-        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+        SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
                 stateStore.getCollectedMetrics(context);
-        assertThat(evaluatedMetricsOpt).isPresent();
-        assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+        assertThat(evaluatedMetrics).hasSize(expectedSize);
     }
 
     private void setClocksTo(Instant time) {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index fd61ccdb..7f507dd4 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -51,7 +51,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -201,7 +200,7 @@ public class JobAutoScalerImplTest {
         autoscaler.applyParallelismOverrides(context);
         assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
 
-        
assertThat(stateStore.getParallelismOverrides(context)).hasValue(Map.of(v1, 
"1", v2, "2"));
+        
assertThat(stateStore.getParallelismOverrides(context)).isEqualTo(Map.of(v1, 
"1", v2, "2"));
 
         // Disabling autoscaler should clear overrides
         context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), 
"false");
@@ -225,13 +224,13 @@ public class JobAutoScalerImplTest {
         stateStore.flush(context);
         autoscaler.applyParallelismOverrides(context);
 
-        
assertThat(autoscaler.getParallelismOverrides(context)).hasValue(Map.of(v1, 
"1", v2, "2"));
+        
assertThat(autoscaler.getParallelismOverrides(context)).isEqualTo(Map.of(v1, 
"1", v2, "2"));
         assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
 
         context.getConfiguration().setString(SCALING_ENABLED.key(), "false");
 
         autoscaler.applyParallelismOverrides(context);
-        
assertThat(autoscaler.getParallelismOverrides(context)).hasValue(Map.of(v1, 
"1", v2, "2"));
+        
assertThat(autoscaler.getParallelismOverrides(context)).isEqualTo(Map.of(v1, 
"1", v2, "2"));
         assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
 
         // Test error handling
@@ -247,9 +246,9 @@ public class JobAutoScalerImplTest {
         var autoscaler =
                 new JobAutoScalerImpl<>(
                         null, null, null, eventCollector, scalingRealizer, 
stateStore) {
-                    public Optional<Map<String, String>> 
getParallelismOverrides(
+                    public Map<String, String> getParallelismOverrides(
                             JobAutoScalerContext<JobID> ctx) {
-                        return Optional.of(new HashMap<>(overrides));
+                        return new HashMap<>(overrides);
                     }
                 };
 
@@ -314,7 +313,7 @@ public class JobAutoScalerImplTest {
     }
 
     private void getInstantScalingSummaryTreeMap(
-            SortedMap<Instant, ScalingSummary> scalingHistory,
+            SortedMap<Instant, ScalingSummary> scalingHistoryData,
             Clock clock,
             int expectedScalingHistorySize)
             throws Exception {
@@ -323,7 +322,7 @@ public class JobAutoScalerImplTest {
                 new JobAutoScalerImpl<>(
                         null, null, null, eventCollector, scalingRealizer, 
stateStore);
 
-        enrichStateStore(scalingHistory);
+        enrichStateStore(scalingHistoryData);
         stateStore.flush(context);
         assertThat(stateStore.getFlushCount()).isEqualTo(1);
 
@@ -334,10 +333,10 @@ public class JobAutoScalerImplTest {
         assertThat(stateStore.getCollectedMetrics(context)).isEmpty();
 
         if (expectedScalingHistorySize > 0) {
-            Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
scalingHistoryOpt =
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory =
                     stateStore.getScalingHistory(context);
-            assertThat(scalingHistoryOpt).isPresent();
-            assertThat(scalingHistoryOpt.get().values())
+            assertThat(scalingHistory).isNotEmpty();
+            assertThat(scalingHistory.values())
                     .allMatch(aa -> aa.size() == expectedScalingHistorySize);
         } else {
             assertThat(stateStore.getScalingHistory(context)).isEmpty();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index d216ddfa..bd8562cc 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -568,11 +568,11 @@ public class MetricsCollectionAndEvaluationTest {
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), 
ZoneId.systemDefault()));
         assertTrue(
                 metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().isEmpty());
-        assertEquals(1, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(1, stateStore.getCollectedMetrics(context).size());
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), 
ZoneId.systemDefault()));
         assertTrue(
                 metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().isEmpty());
-        assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(2, stateStore.getCollectedMetrics(context).size());
 
         testTolerateMetricsMissingDuringStabilizationPhase(topology);
 
@@ -580,18 +580,18 @@ public class MetricsCollectionAndEvaluationTest {
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), 
ZoneId.systemDefault()));
         assertEquals(
                 3, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
-        assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(3, stateStore.getCollectedMetrics(context).size());
 
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(180), 
ZoneId.systemDefault()));
         assertEquals(
                 4, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
-        assertEquals(4, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(4, stateStore.getCollectedMetrics(context).size());
 
         // Once we reach full time we trim the stabilization metrics
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(260), 
ZoneId.systemDefault()));
         assertEquals(
                 2, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
-        assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(2, stateStore.getCollectedMetrics(context).size());
     }
 
     private void 
testTolerateMetricsMissingDuringStabilizationPhase(JobTopology topology) {
@@ -610,7 +610,7 @@ public class MetricsCollectionAndEvaluationTest {
         collectorWithMissingMetrics.setJobUpdateTs(startTime);
 
         Supplier<Integer> numCollectedMetricsSupplier =
-                () -> stateStore.getCollectedMetrics(context).get().size();
+                () -> stateStore.getCollectedMetrics(context).size();
 
         int numCollectedMetricsBeforeTest = numCollectedMetricsSupplier.get();
         assertThrows(
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index c4b714ef..1884268e 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -39,7 +39,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 
@@ -205,7 +204,7 @@ public class RecommendedParallelismTest {
 
         // after restart while the job is not running the evaluated metrics 
are gone
         autoscaler.scale(context);
-        assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
+        assertEquals(3, stateStore.getCollectedMetrics(context).size());
         assertNull(autoscaler.lastEvaluatedMetrics.get(context.getJobKey()));
         scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.get(source));
@@ -229,10 +228,9 @@ public class RecommendedParallelismTest {
     }
 
     private void assertEvaluatedMetricsSize(int expectedSize) {
-        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+        SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
                 stateStore.getCollectedMetrics(context);
-        assertThat(evaluatedMetricsOpt).isPresent();
-        assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+        assertThat(evaluatedMetrics).hasSize(expectedSize);
     }
 
     private Double getCurrentMetricValue(JobVertexID jobVertexID, 
ScalingMetric scalingMetric) {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index bae6f36b..3265ba25 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -247,15 +247,10 @@ public class ScalingExecutorTest {
             Map<JobVertexID, Integer> getScaledParallelism(
                     AutoScalerStateStore<KEY, Context> stateStore, Context 
context)
                     throws Exception {
-        return stateStore
-                .getParallelismOverrides(context)
-                .map(
-                        overrides ->
-                                overrides.entrySet().stream()
-                                        .collect(
-                                                Collectors.toMap(
-                                                        e -> 
JobVertexID.fromHexString(e.getKey()),
-                                                        e -> 
Integer.valueOf(e.getValue()))))
-                .orElse(Map.of());
+        return stateStore.getParallelismOverrides(context).entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                e -> JobVertexID.fromHexString(e.getKey()),
+                                e -> Integer.valueOf(e.getValue())));
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
index 5f45f368..27e3083f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
@@ -43,9 +43,11 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.util.Base64;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -84,21 +86,21 @@ public class KubernetesAutoScalerStateStore
     }
 
     @Override
-    public Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
getScalingHistory(
+    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(
             KubernetesJobAutoScalerContext jobContext) {
         Optional<String> serializedScalingHistory =
                 configMapStore.getSerializedState(jobContext, 
SCALING_HISTORY_KEY);
         if (serializedScalingHistory.isEmpty()) {
-            return Optional.empty();
+            return new HashMap<>();
         }
         try {
-            return 
Optional.of(deserializeScalingHistory(serializedScalingHistory.get()));
+            return deserializeScalingHistory(serializedScalingHistory.get());
         } catch (JacksonException e) {
             LOG.error(
                     "Could not deserialize scaling history, possibly the 
format changed. Discarding...",
                     e);
             configMapStore.removeSerializedState(jobContext, 
SCALING_HISTORY_KEY);
-            return Optional.empty();
+            return new HashMap<>();
         }
     }
 
@@ -116,21 +118,21 @@ public class KubernetesAutoScalerStateStore
     }
 
     @Override
-    public Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(
+    public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(
             KubernetesJobAutoScalerContext jobContext) {
         Optional<String> serializedEvaluatedMetricsOpt =
                 configMapStore.getSerializedState(jobContext, 
COLLECTED_METRICS_KEY);
         if (serializedEvaluatedMetricsOpt.isEmpty()) {
-            return Optional.empty();
+            return new TreeMap<>();
         }
         try {
-            return 
Optional.of(deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get()));
+            return 
deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get());
         } catch (JacksonException e) {
             LOG.error(
                     "Could not deserialize metric history, possibly the format 
changed. Discarding...",
                     e);
             configMapStore.removeSerializedState(jobContext, 
COLLECTED_METRICS_KEY);
-            return Optional.empty();
+            return new TreeMap<>();
         }
     }
 
@@ -149,11 +151,11 @@ public class KubernetesAutoScalerStateStore
     }
 
     @Override
-    public Optional<Map<String, String>> getParallelismOverrides(
-            KubernetesJobAutoScalerContext jobContext) {
+    public Map<String, String> 
getParallelismOverrides(KubernetesJobAutoScalerContext jobContext) {
         return configMapStore
                 .getSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY)
-                
.map(KubernetesAutoScalerStateStore::deserializeParallelismOverrides);
+                
.map(KubernetesAutoScalerStateStore::deserializeParallelismOverrides)
+                .orElse(new HashMap<>());
     }
 
     @Override
@@ -216,12 +218,7 @@ public class KubernetesAutoScalerStateStore
                         .map(String::length)
                         .orElse(0);
 
-        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
-                getCollectedMetrics(context);
-        if (evaluatedMetricsOpt.isEmpty()) {
-            return;
-        }
-        SortedMap<Instant, CollectedMetrics> metricHistory = 
evaluatedMetricsOpt.get();
+        SortedMap<Instant, CollectedMetrics> metricHistory = 
getCollectedMetrics(context);
         while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) {
             if (metricHistory.isEmpty()) {
                 return;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
index 21f0c7c3..f4faaa55 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
@@ -218,7 +218,7 @@ public class KubernetesAutoScalerStateStoreTest {
         var now = Instant.now();
 
         Assertions.assertEquals(scalingHistory, 
getTrimmedScalingHistory(stateStore, ctx, now));
-        
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
+        
assertThat(stateStore.getCollectedMetrics(ctx)).isEqualTo(metricHistory);
 
         // Override with compressed data
         var newTs = Instant.now();
@@ -228,7 +228,7 @@ public class KubernetesAutoScalerStateStoreTest {
 
         // Make sure we can still access everything
         Assertions.assertEquals(scalingHistory, 
getTrimmedScalingHistory(stateStore, ctx, newTs));
-        
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
+        
assertThat(stateStore.getCollectedMetrics(ctx)).isEqualTo(metricHistory);
     }
 
     @Test

Reply via email to