This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 8d307086 [FLINK-32690] report Double.NAN instead of null for missing autoscaler metrics 8d307086 is described below commit 8d307086122bdf4f74aaa9a69b5b8cc9025afdc8 Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Wed Jun 14 11:44:39 2023 -0700 [FLINK-32690] report Double.NAN instead of null for missing autoscaler metrics --- .../autoscaler/AutoscalerFlinkMetrics.java | 35 ++++- .../operator/autoscaler/JobAutoScalerImpl.java | 20 +-- .../autoscaler/AutoScalerFlinkMetricsTest.java | 161 +++++++++++++++++++++ 3 files changed, 193 insertions(+), 23 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java index fab3cb32..b443935c 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.metrics.Counter; @@ -32,10 +33,16 @@ import java.util.Optional; import java.util.Set; import java.util.function.Supplier; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; + /** Autoscaler metrics for observability. */ public class AutoscalerFlinkMetrics { private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class); + @VisibleForTesting static final String CURRENT = "Current"; + @VisibleForTesting static final String AVERAGE = "Average"; + @VisibleForTesting static final String JOB_VERTEX_ID = "jobVertexID"; final Counter numScalings; @@ -66,14 +73,14 @@ public class AutoscalerFlinkMetrics { } LOG.info("Registering scaling metrics for job vertex {}", jobVertexID); var jobVertexMg = - metricGroup.addGroup("jobVertexID", jobVertexID.toHexString()); + metricGroup.addGroup(JOB_VERTEX_ID, jobVertexID.toHexString()); evaluated.forEach( (sm, esm) -> { var smGroup = jobVertexMg.addGroup(sm.name()); smGroup.gauge( - "Current", + CURRENT, () -> Optional.ofNullable( currentVertexMetrics.get()) @@ -82,11 +89,11 @@ public class AutoscalerFlinkMetrics { .map( EvaluatedScalingMetric ::getCurrent) - .orElse(null)); + .orElse(Double.NaN)); if (sm.isCalculateAverage()) { smGroup.gauge( - "Average", + AVERAGE, () -> Optional.ofNullable( currentVertexMetrics @@ -96,9 +103,27 @@ public class AutoscalerFlinkMetrics { .map( EvaluatedScalingMetric ::getAverage) - .orElse(null)); + .orElse(Double.NaN)); } }); }); } + + @VisibleForTesting + static void initRecommendedParallelism( + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) { + evaluatedMetrics.forEach( + (jobVertexID, evaluatedScalingMetricMap) -> + evaluatedScalingMetricMap.put( + RECOMMENDED_PARALLELISM, + evaluatedScalingMetricMap.get(PARALLELISM))); + } + + @VisibleForTesting + static void resetRecommendedParallelism( + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) { + evaluatedMetrics.forEach( + (jobVertexID, evaluatedScalingMetricMap) -> + evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM, null)); + } } diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index fbefd15a..75ef9cd0 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism; import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; /** Application and SessionJob autoscaler. */ public class JobAutoScalerImpl implements JobAutoScaler { @@ -191,20 +191,4 @@ public class JobAutoScalerImpl implements JobAutoScaler { new AutoscalerFlinkMetrics( ctx.getResourceMetricGroup().addGroup("AutoScaler"))); } - - private void initRecommendedParallelism( - Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) { - evaluatedMetrics.forEach( - (jobVertexID, evaluatedScalingMetricMap) -> - evaluatedScalingMetricMap.put( - RECOMMENDED_PARALLELISM, - evaluatedScalingMetricMap.get(PARALLELISM))); - } - - private void resetRecommendedParallelism( - Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics) { - evaluatedMetrics.forEach( - (jobVertexID, evaluatedScalingMetricMap) -> - evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM, null)); - } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java new file mode 100644 index 00000000..b5760ec7 --- /dev/null +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.AVERAGE; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.CURRENT; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.JOB_VERTEX_ID; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** {@link AutoscalerFlinkMetrics} tests. */ +public class AutoScalerFlinkMetricsTest { + + private final Configuration configuration = new Configuration(); + private final JobVertexID jobVertexID = new JobVertexID(); + private ResourceID resourceID; + private TestingMetricListener listener; + private AutoscalerFlinkMetrics metrics; + + @BeforeEach + public void init() { + listener = new TestingMetricListener(configuration); + metrics = new AutoscalerFlinkMetrics(listener.getMetricGroup()); + resourceID = ResourceID.fromResource(TestUtils.buildApplicationCluster()); + } + + @Test + public void testMetricsRegistration() { + var evaluatedMetrics = Map.of(jobVertexID, testMetrics()); + var lastEvaluatedMetrics = + new HashMap< + ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>(); + initRecommendedParallelism(evaluatedMetrics); + lastEvaluatedMetrics.put(resourceID, evaluatedMetrics); + + metrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceID)); + metrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceID)); + + assertEquals(1.0, getCurrentMetricValue(PARALLELISM)); + assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM)); + assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE)); + assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE)); + } + + @Test + public void testMetricsCleanup() { + var evaluatedMetrics = Map.of(jobVertexID, testMetrics()); + var lastEvaluatedMetrics = + new HashMap< + ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>(); + initRecommendedParallelism(evaluatedMetrics); + lastEvaluatedMetrics.put(resourceID, evaluatedMetrics); + metrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceID)); + + assertEquals(1.0, getCurrentMetricValue(PARALLELISM)); + assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM)); + assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE)); + assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE)); + + lastEvaluatedMetrics.remove(resourceID); + assertEquals(Double.NaN, getCurrentMetricValue(PARALLELISM)); + assertEquals(Double.NaN, getCurrentMetricValue(RECOMMENDED_PARALLELISM)); + assertEquals(Double.NaN, getCurrentMetricValue(TRUE_PROCESSING_RATE)); + assertEquals(Double.NaN, getAverageMetricValue(TRUE_PROCESSING_RATE)); + } + + @Test + public void testRecommendedParallelismWithinMetricWindow() { + var evaluatedMetrics = Map.of(jobVertexID, testMetrics()); + var lastEvaluatedMetrics = + new HashMap< + ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>(); + initRecommendedParallelism(evaluatedMetrics); + resetRecommendedParallelism(evaluatedMetrics); + lastEvaluatedMetrics.put(resourceID, evaluatedMetrics); + + metrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceID)); + assertEquals(1.0, getCurrentMetricValue(PARALLELISM)); + assertEquals(Double.NaN, getCurrentMetricValue(RECOMMENDED_PARALLELISM)); + assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE)); + assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE)); + } + + @Test + public void testRecommendedParallelismPastMetricWindow() { + var evaluatedMetrics = Map.of(jobVertexID, testMetrics()); + var lastEvaluatedMetrics = + new HashMap< + ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>(); + initRecommendedParallelism(evaluatedMetrics); + lastEvaluatedMetrics.put(resourceID, evaluatedMetrics); + + metrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceID)); + assertEquals(1.0, getCurrentMetricValue(PARALLELISM)); + assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM)); + assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE)); + assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE)); + } + + private static Map<ScalingMetric, EvaluatedScalingMetric> testMetrics() { + var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); + metrics.put(PARALLELISM, EvaluatedScalingMetric.of(1)); + metrics.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(1000., 2000.)); + + return metrics; + } + + private Object getCurrentMetricValue(ScalingMetric metric) { + return listener.getGauge(getCurrentMetricId(metric)).orElse(() -> Double.NaN).getValue(); + } + + private Object getAverageMetricValue(ScalingMetric metric) { + return listener.getGauge(getAverageMetricId(metric)).get().getValue(); + } + + private String getCurrentMetricId(ScalingMetric metric) { + return getMetricId(metric, CURRENT); + } + + private String getAverageMetricId(ScalingMetric metric) { + return getMetricId(metric, AVERAGE); + } + + private String getMetricId(ScalingMetric metric, String classifier) { + return listener.getMetricId( + JOB_VERTEX_ID, jobVertexID.toHexString(), metric.name(), classifier); + } +}