This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6a6d4ce7 [FLINK-33097] Cleanup use of Optional for new interfaces (#709) 6a6d4ce7 is described below commit 6a6d4ce7609d35ccc994b1dd20afd2c8ef5835fe Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Nov 13 05:29:55 2023 -0800 [FLINK-33097] Cleanup use of Optional for new interfaces (#709) The new interfaces make use of Optional in many places where returning the object directly would result in less and more readable code. --- .../apache/flink/autoscaler/JobAutoScalerImpl.java | 21 +++++++-------- .../flink/autoscaler/ScalingMetricCollector.java | 3 +-- .../autoscaler/metrics/ScalingHistoryUtils.java | 6 +---- .../autoscaler/state/AutoScalerStateStore.java | 15 ++++++----- .../state/InMemoryAutoScalerStateStore.java | 17 +++++++----- .../flink/autoscaler/BacklogBasedScalingTest.java | 8 +++--- .../flink/autoscaler/JobAutoScalerImplTest.java | 21 +++++++-------- .../MetricsCollectionAndEvaluationTest.java | 12 ++++----- .../autoscaler/RecommendedParallelismTest.java | 8 +++--- .../flink/autoscaler/ScalingExecutorTest.java | 15 ++++------- .../autoscaler/KubernetesAutoScalerStateStore.java | 31 ++++++++++------------ .../KubernetesAutoScalerStateStoreTest.java | 4 +-- 12 files changed, 74 insertions(+), 87 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 663a8669..0e2afb55 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.time.Clock; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; @@ -123,21 +122,20 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> private void clearStatesAfterAutoscalerDisabled(Context ctx) throws Exception { var needFlush = false; - var parallelismOverridesOpt = stateStore.getParallelismOverrides(ctx); - if (parallelismOverridesOpt.isPresent()) { + var parallelismOverrides = stateStore.getParallelismOverrides(ctx); + if (!parallelismOverrides.isEmpty()) { needFlush = true; stateStore.removeParallelismOverrides(ctx); } - var collectedMetricsOpt = stateStore.getCollectedMetrics(ctx); - if (collectedMetricsOpt.isPresent()) { + var collectedMetrics = stateStore.getCollectedMetrics(ctx); + if (!collectedMetrics.isEmpty()) { needFlush = true; stateStore.removeCollectedMetrics(ctx); } - var scalingHistoryOpt = stateStore.getScalingHistory(ctx); - if (scalingHistoryOpt.isPresent()) { - var scalingHistory = scalingHistoryOpt.get(); + var scalingHistory = stateStore.getScalingHistory(ctx); + if (!scalingHistory.isEmpty()) { var trimmedScalingHistory = trimScalingHistory(clock.instant(), ctx.getConfiguration(), scalingHistory); if (trimmedScalingHistory.isEmpty()) { @@ -157,7 +155,7 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> } @VisibleForTesting - protected Optional<Map<String, String>> getParallelismOverrides(Context ctx) throws Exception { + protected Map<String, String> getParallelismOverrides(Context ctx) throws Exception { return stateStore.getParallelismOverrides(ctx); } @@ -169,11 +167,10 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> */ @VisibleForTesting protected void applyParallelismOverrides(Context ctx) throws Exception { - var overridesOpt = getParallelismOverrides(ctx); - if (overridesOpt.isEmpty() || overridesOpt.get().isEmpty()) { + var overrides = getParallelismOverrides(ctx); + if (overrides.isEmpty()) { return; } - Map<String, String> overrides = overridesOpt.get(); LOG.debug("Applying parallelism overrides: {}", overrides); var conf = ctx.getConfiguration(); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index 51f5931b..4e891f78 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -58,7 +58,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -94,7 +93,7 @@ public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerC jobKey, (k) -> { try { - return stateStore.getCollectedMetrics(ctx).orElse(new TreeMap<>()); + return stateStore.getCollectedMetrics(ctx); } catch (Exception exception) { throw new RuntimeException( "Get evaluated metrics failed.", exception); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java index 91aedff4..97736d1d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nonnull; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -86,10 +85,7 @@ public class ScalingHistoryUtils { Instant now) throws Exception { var conf = context.getConfiguration(); - return autoScalerStateStore - .getScalingHistory(context) - .map(scalingHistory -> trimScalingHistory(now, conf, scalingHistory)) - .orElse(new HashMap<>()); + return trimScalingHistory(now, conf, autoScalerStateStore.getScalingHistory(context)); } public static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimScalingHistory( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java index b0255506..bb3329a4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -23,9 +23,10 @@ import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.runtime.jobgraph.JobVertexID; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.Map; -import java.util.Optional; import java.util.SortedMap; /** @@ -41,23 +42,25 @@ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext< Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) throws Exception; - Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory( - Context jobContext) throws Exception; + @Nonnull + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory(Context jobContext) + throws Exception; void removeScalingHistory(Context jobContext) throws Exception; void storeCollectedMetrics(Context jobContext, SortedMap<Instant, CollectedMetrics> metrics) throws Exception; - Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(Context jobContext) - throws Exception; + @Nonnull + SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context jobContext) throws Exception; void removeCollectedMetrics(Context jobContext) throws Exception; void storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides) throws Exception; - Optional<Map<String, String>> getParallelismOverrides(Context jobContext) throws Exception; + @Nonnull + Map<String, String> getParallelismOverrides(Context jobContext) throws Exception; void removeParallelismOverrides(Context jobContext) throws Exception; diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java index 1cb74d44..29176365 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java @@ -23,9 +23,11 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.time.Instant; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; /** @@ -58,9 +60,10 @@ public class InMemoryAutoScalerStateStore<KEY, Context extends JobAutoScalerCont } @Override - public Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory( + public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory( Context jobContext) { - return Optional.ofNullable(scalingHistoryStore.get(jobContext.getJobKey())); + return Optional.ofNullable(scalingHistoryStore.get(jobContext.getJobKey())) + .orElse(new HashMap<>()); } @Override @@ -75,8 +78,9 @@ public class InMemoryAutoScalerStateStore<KEY, Context extends JobAutoScalerCont } @Override - public Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(Context jobContext) { - return Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey())); + public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context jobContext) { + return Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey())) + .orElse(new TreeMap<>()); } @Override @@ -91,8 +95,9 @@ public class InMemoryAutoScalerStateStore<KEY, Context extends JobAutoScalerCont } @Override - public Optional<Map<String, String>> getParallelismOverrides(Context jobContext) { - return Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey())); + public Map<String, String> getParallelismOverrides(Context jobContext) { + return Optional.ofNullable(parallelismOverridesStore.get(jobContext.getJobKey())) + .orElse(new HashMap<>()); } @Override diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index 3e5c6fa1..810bffe3 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.SortedMap; @@ -368,7 +367,7 @@ public class BacklogBasedScalingTest { "", Double.NaN, Double.NaN, Double.NaN, 500.)))); autoscaler.scale(context); - assertFalse(stateStore.getCollectedMetrics(context).get().isEmpty()); + assertFalse(stateStore.getCollectedMetrics(context).isEmpty()); } @Test @@ -397,10 +396,9 @@ public class BacklogBasedScalingTest { } private void assertEvaluatedMetricsSize(int expectedSize) { - Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt = + SortedMap<Instant, CollectedMetrics> evaluatedMetrics = stateStore.getCollectedMetrics(context); - assertThat(evaluatedMetricsOpt).isPresent(); - assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize); + assertThat(evaluatedMetrics).hasSize(expectedSize); } private void setClocksTo(Instant time) { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index fd61ccdb..7f507dd4 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -51,7 +51,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -201,7 +200,7 @@ public class JobAutoScalerImplTest { autoscaler.applyParallelismOverrides(context); assertParallelismOverrides(Map.of(v1, "1", v2, "2")); - assertThat(stateStore.getParallelismOverrides(context)).hasValue(Map.of(v1, "1", v2, "2")); + assertThat(stateStore.getParallelismOverrides(context)).isEqualTo(Map.of(v1, "1", v2, "2")); // Disabling autoscaler should clear overrides context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "false"); @@ -225,13 +224,13 @@ public class JobAutoScalerImplTest { stateStore.flush(context); autoscaler.applyParallelismOverrides(context); - assertThat(autoscaler.getParallelismOverrides(context)).hasValue(Map.of(v1, "1", v2, "2")); + assertThat(autoscaler.getParallelismOverrides(context)).isEqualTo(Map.of(v1, "1", v2, "2")); assertParallelismOverrides(Map.of(v1, "1", v2, "2")); context.getConfiguration().setString(SCALING_ENABLED.key(), "false"); autoscaler.applyParallelismOverrides(context); - assertThat(autoscaler.getParallelismOverrides(context)).hasValue(Map.of(v1, "1", v2, "2")); + assertThat(autoscaler.getParallelismOverrides(context)).isEqualTo(Map.of(v1, "1", v2, "2")); assertParallelismOverrides(Map.of(v1, "1", v2, "2")); // Test error handling @@ -247,9 +246,9 @@ public class JobAutoScalerImplTest { var autoscaler = new JobAutoScalerImpl<>( null, null, null, eventCollector, scalingRealizer, stateStore) { - public Optional<Map<String, String>> getParallelismOverrides( + public Map<String, String> getParallelismOverrides( JobAutoScalerContext<JobID> ctx) { - return Optional.of(new HashMap<>(overrides)); + return new HashMap<>(overrides); } }; @@ -314,7 +313,7 @@ public class JobAutoScalerImplTest { } private void getInstantScalingSummaryTreeMap( - SortedMap<Instant, ScalingSummary> scalingHistory, + SortedMap<Instant, ScalingSummary> scalingHistoryData, Clock clock, int expectedScalingHistorySize) throws Exception { @@ -323,7 +322,7 @@ public class JobAutoScalerImplTest { new JobAutoScalerImpl<>( null, null, null, eventCollector, scalingRealizer, stateStore); - enrichStateStore(scalingHistory); + enrichStateStore(scalingHistoryData); stateStore.flush(context); assertThat(stateStore.getFlushCount()).isEqualTo(1); @@ -334,10 +333,10 @@ public class JobAutoScalerImplTest { assertThat(stateStore.getCollectedMetrics(context)).isEmpty(); if (expectedScalingHistorySize > 0) { - Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> scalingHistoryOpt = + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory = stateStore.getScalingHistory(context); - assertThat(scalingHistoryOpt).isPresent(); - assertThat(scalingHistoryOpt.get().values()) + assertThat(scalingHistory).isNotEmpty(); + assertThat(scalingHistory.values()) .allMatch(aa -> aa.size() == expectedScalingHistorySize); } else { assertThat(stateStore.getScalingHistory(context)).isEmpty(); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index d216ddfa..bd8562cc 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -568,11 +568,11 @@ public class MetricsCollectionAndEvaluationTest { metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), ZoneId.systemDefault())); assertTrue( metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty()); - assertEquals(1, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(1, stateStore.getCollectedMetrics(context).size()); metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), ZoneId.systemDefault())); assertTrue( metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty()); - assertEquals(2, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(2, stateStore.getCollectedMetrics(context).size()); testTolerateMetricsMissingDuringStabilizationPhase(topology); @@ -580,18 +580,18 @@ public class MetricsCollectionAndEvaluationTest { metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), ZoneId.systemDefault())); assertEquals( 3, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size()); - assertEquals(3, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(3, stateStore.getCollectedMetrics(context).size()); metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(180), ZoneId.systemDefault())); assertEquals( 4, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size()); - assertEquals(4, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(4, stateStore.getCollectedMetrics(context).size()); // Once we reach full time we trim the stabilization metrics metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(260), ZoneId.systemDefault())); assertEquals( 2, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size()); - assertEquals(2, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(2, stateStore.getCollectedMetrics(context).size()); } private void testTolerateMetricsMissingDuringStabilizationPhase(JobTopology topology) { @@ -610,7 +610,7 @@ public class MetricsCollectionAndEvaluationTest { collectorWithMissingMetrics.setJobUpdateTs(startTime); Supplier<Integer> numCollectedMetricsSupplier = - () -> stateStore.getCollectedMetrics(context).get().size(); + () -> stateStore.getCollectedMetrics(context).size(); int numCollectedMetricsBeforeTest = numCollectedMetricsSupplier.get(); assertThrows( diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index c4b714ef..1884268e 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.SortedMap; @@ -205,7 +204,7 @@ public class RecommendedParallelismTest { // after restart while the job is not running the evaluated metrics are gone autoscaler.scale(context); - assertEquals(3, stateStore.getCollectedMetrics(context).get().size()); + assertEquals(3, stateStore.getCollectedMetrics(context).size()); assertNull(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())); scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); assertEquals(4, scaledParallelism.get(source)); @@ -229,10 +228,9 @@ public class RecommendedParallelismTest { } private void assertEvaluatedMetricsSize(int expectedSize) { - Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt = + SortedMap<Instant, CollectedMetrics> evaluatedMetrics = stateStore.getCollectedMetrics(context); - assertThat(evaluatedMetricsOpt).isPresent(); - assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize); + assertThat(evaluatedMetrics).hasSize(expectedSize); } private Double getCurrentMetricValue(JobVertexID jobVertexID, ScalingMetric scalingMetric) { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index bae6f36b..3265ba25 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -247,15 +247,10 @@ public class ScalingExecutorTest { Map<JobVertexID, Integer> getScaledParallelism( AutoScalerStateStore<KEY, Context> stateStore, Context context) throws Exception { - return stateStore - .getParallelismOverrides(context) - .map( - overrides -> - overrides.entrySet().stream() - .collect( - Collectors.toMap( - e -> JobVertexID.fromHexString(e.getKey()), - e -> Integer.valueOf(e.getValue())))) - .orElse(Map.of()); + return stateStore.getParallelismOverrides(context).entrySet().stream() + .collect( + Collectors.toMap( + e -> JobVertexID.fromHexString(e.getKey()), + e -> Integer.valueOf(e.getValue()))); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java index 5f45f368..27e3083f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java @@ -43,9 +43,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Base64; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.SortedMap; +import java.util.TreeMap; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -84,21 +86,21 @@ public class KubernetesAutoScalerStateStore } @Override - public Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory( + public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory( KubernetesJobAutoScalerContext jobContext) { Optional<String> serializedScalingHistory = configMapStore.getSerializedState(jobContext, SCALING_HISTORY_KEY); if (serializedScalingHistory.isEmpty()) { - return Optional.empty(); + return new HashMap<>(); } try { - return Optional.of(deserializeScalingHistory(serializedScalingHistory.get())); + return deserializeScalingHistory(serializedScalingHistory.get()); } catch (JacksonException e) { LOG.error( "Could not deserialize scaling history, possibly the format changed. Discarding...", e); configMapStore.removeSerializedState(jobContext, SCALING_HISTORY_KEY); - return Optional.empty(); + return new HashMap<>(); } } @@ -116,21 +118,21 @@ public class KubernetesAutoScalerStateStore } @Override - public Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics( + public SortedMap<Instant, CollectedMetrics> getCollectedMetrics( KubernetesJobAutoScalerContext jobContext) { Optional<String> serializedEvaluatedMetricsOpt = configMapStore.getSerializedState(jobContext, COLLECTED_METRICS_KEY); if (serializedEvaluatedMetricsOpt.isEmpty()) { - return Optional.empty(); + return new TreeMap<>(); } try { - return Optional.of(deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get())); + return deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get()); } catch (JacksonException e) { LOG.error( "Could not deserialize metric history, possibly the format changed. Discarding...", e); configMapStore.removeSerializedState(jobContext, COLLECTED_METRICS_KEY); - return Optional.empty(); + return new TreeMap<>(); } } @@ -149,11 +151,11 @@ public class KubernetesAutoScalerStateStore } @Override - public Optional<Map<String, String>> getParallelismOverrides( - KubernetesJobAutoScalerContext jobContext) { + public Map<String, String> getParallelismOverrides(KubernetesJobAutoScalerContext jobContext) { return configMapStore .getSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY) - .map(KubernetesAutoScalerStateStore::deserializeParallelismOverrides); + .map(KubernetesAutoScalerStateStore::deserializeParallelismOverrides) + .orElse(new HashMap<>()); } @Override @@ -216,12 +218,7 @@ public class KubernetesAutoScalerStateStore .map(String::length) .orElse(0); - Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt = - getCollectedMetrics(context); - if (evaluatedMetricsOpt.isEmpty()) { - return; - } - SortedMap<Instant, CollectedMetrics> metricHistory = evaluatedMetricsOpt.get(); + SortedMap<Instant, CollectedMetrics> metricHistory = getCollectedMetrics(context); while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) { if (metricHistory.isEmpty()) { return; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java index 21f0c7c3..f4faaa55 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java @@ -218,7 +218,7 @@ public class KubernetesAutoScalerStateStoreTest { var now = Instant.now(); Assertions.assertEquals(scalingHistory, getTrimmedScalingHistory(stateStore, ctx, now)); - assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory); + assertThat(stateStore.getCollectedMetrics(ctx)).isEqualTo(metricHistory); // Override with compressed data var newTs = Instant.now(); @@ -228,7 +228,7 @@ public class KubernetesAutoScalerStateStoreTest { // Make sure we can still access everything Assertions.assertEquals(scalingHistory, getTrimmedScalingHistory(stateStore, ctx, newTs)); - assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory); + assertThat(stateStore.getCollectedMetrics(ctx)).isEqualTo(metricHistory); } @Test