This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d307086 [FLINK-32690] report Double.NAN instead of null for missing 
autoscaler metrics
8d307086 is described below

commit 8d307086122bdf4f74aaa9a69b5b8cc9025afdc8
Author: Matyas Orhidi <matyas_orh...@apple.com>
AuthorDate: Wed Jun 14 11:44:39 2023 -0700

    [FLINK-32690] report Double.NAN instead of null for missing autoscaler 
metrics
---
 .../autoscaler/AutoscalerFlinkMetrics.java         |  35 ++++-
 .../operator/autoscaler/JobAutoScalerImpl.java     |  20 +--
 .../autoscaler/AutoScalerFlinkMetricsTest.java     | 161 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 23 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
index fab3cb32..b443935c 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.metrics.Counter;
@@ -32,10 +33,16 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+
 /** Autoscaler metrics for observability. */
 public class AutoscalerFlinkMetrics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);
+    @VisibleForTesting static final String CURRENT = "Current";
+    @VisibleForTesting static final String AVERAGE = "Average";
+    @VisibleForTesting static final String JOB_VERTEX_ID = "jobVertexID";
 
     final Counter numScalings;
 
@@ -66,14 +73,14 @@ public class AutoscalerFlinkMetrics {
                             }
                             LOG.info("Registering scaling metrics for job 
vertex {}", jobVertexID);
                             var jobVertexMg =
-                                    metricGroup.addGroup("jobVertexID", 
jobVertexID.toHexString());
+                                    metricGroup.addGroup(JOB_VERTEX_ID, 
jobVertexID.toHexString());
 
                             evaluated.forEach(
                                     (sm, esm) -> {
                                         var smGroup = 
jobVertexMg.addGroup(sm.name());
 
                                         smGroup.gauge(
-                                                "Current",
+                                                CURRENT,
                                                 () ->
                                                         Optional.ofNullable(
                                                                         
currentVertexMetrics.get())
@@ -82,11 +89,11 @@ public class AutoscalerFlinkMetrics {
                                                                 .map(
                                                                         
EvaluatedScalingMetric
                                                                                
 ::getCurrent)
-                                                                .orElse(null));
+                                                                
.orElse(Double.NaN));
 
                                         if (sm.isCalculateAverage()) {
                                             smGroup.gauge(
-                                                    "Average",
+                                                    AVERAGE,
                                                     () ->
                                                             
Optional.ofNullable(
                                                                             
currentVertexMetrics
@@ -96,9 +103,27 @@ public class AutoscalerFlinkMetrics {
                                                                     .map(
                                                                             
EvaluatedScalingMetric
                                                                                
     ::getAverage)
-                                                                    
.orElse(null));
+                                                                    
.orElse(Double.NaN));
                                         }
                                     });
                         });
     }
+
+    @VisibleForTesting
+    static void initRecommendedParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
+        evaluatedMetrics.forEach(
+                (jobVertexID, evaluatedScalingMetricMap) ->
+                        evaluatedScalingMetricMap.put(
+                                RECOMMENDED_PARALLELISM,
+                                evaluatedScalingMetricMap.get(PARALLELISM)));
+    }
+
+    @VisibleForTesting
+    static void resetRecommendedParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
+        evaluatedMetrics.forEach(
+                (jobVertexID, evaluatedScalingMetricMap) ->
+                        evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM, 
null));
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index fbefd15a..75ef9cd0 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 
 /** Application and SessionJob autoscaler. */
 public class JobAutoScalerImpl implements JobAutoScaler {
@@ -191,20 +191,4 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                         new AutoscalerFlinkMetrics(
                                 
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
     }
-
-    private void initRecommendedParallelism(
-            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
-        evaluatedMetrics.forEach(
-                (jobVertexID, evaluatedScalingMetricMap) ->
-                        evaluatedScalingMetricMap.put(
-                                RECOMMENDED_PARALLELISM,
-                                evaluatedScalingMetricMap.get(PARALLELISM)));
-    }
-
-    private void resetRecommendedParallelism(
-            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics) {
-        evaluatedMetrics.forEach(
-                (jobVertexID, evaluatedScalingMetricMap) ->
-                        evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM, 
null));
-    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
new file mode 100644
index 00000000..b5760ec7
--- /dev/null
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.AVERAGE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.CURRENT;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.JOB_VERTEX_ID;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link AutoscalerFlinkMetrics} tests. */
+public class AutoScalerFlinkMetricsTest {
+
+    private final Configuration configuration = new Configuration();
+    private final JobVertexID jobVertexID = new JobVertexID();
+    private ResourceID resourceID;
+    private TestingMetricListener listener;
+    private AutoscalerFlinkMetrics metrics;
+
+    @BeforeEach
+    public void init() {
+        listener = new TestingMetricListener(configuration);
+        metrics = new AutoscalerFlinkMetrics(listener.getMetricGroup());
+        resourceID = 
ResourceID.fromResource(TestUtils.buildApplicationCluster());
+    }
+
+    @Test
+    public void testMetricsRegistration() {
+        var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+        var lastEvaluatedMetrics =
+                new HashMap<
+                        ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>();
+        initRecommendedParallelism(evaluatedMetrics);
+        lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+        metrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceID));
+        metrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceID));
+
+        assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+        assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+        assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+        assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+    }
+
+    @Test
+    public void testMetricsCleanup() {
+        var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+        var lastEvaluatedMetrics =
+                new HashMap<
+                        ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>();
+        initRecommendedParallelism(evaluatedMetrics);
+        lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+        metrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceID));
+
+        assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+        assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+        assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+        assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+
+        lastEvaluatedMetrics.remove(resourceID);
+        assertEquals(Double.NaN, getCurrentMetricValue(PARALLELISM));
+        assertEquals(Double.NaN, 
getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+        assertEquals(Double.NaN, getCurrentMetricValue(TRUE_PROCESSING_RATE));
+        assertEquals(Double.NaN, getAverageMetricValue(TRUE_PROCESSING_RATE));
+    }
+
+    @Test
+    public void testRecommendedParallelismWithinMetricWindow() {
+        var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+        var lastEvaluatedMetrics =
+                new HashMap<
+                        ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>();
+        initRecommendedParallelism(evaluatedMetrics);
+        resetRecommendedParallelism(evaluatedMetrics);
+        lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+        metrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceID));
+        assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+        assertEquals(Double.NaN, 
getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+        assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+        assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+    }
+
+    @Test
+    public void testRecommendedParallelismPastMetricWindow() {
+        var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+        var lastEvaluatedMetrics =
+                new HashMap<
+                        ResourceID, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>();
+        initRecommendedParallelism(evaluatedMetrics);
+        lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+        metrics.registerScalingMetrics(() -> 
lastEvaluatedMetrics.get(resourceID));
+        assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+        assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+        assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+        assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+    }
+
+    private static Map<ScalingMetric, EvaluatedScalingMetric> testMetrics() {
+        var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        metrics.put(PARALLELISM, EvaluatedScalingMetric.of(1));
+        metrics.put(ScalingMetric.TRUE_PROCESSING_RATE, new 
EvaluatedScalingMetric(1000., 2000.));
+
+        return metrics;
+    }
+
+    private Object getCurrentMetricValue(ScalingMetric metric) {
+        return listener.getGauge(getCurrentMetricId(metric)).orElse(() -> 
Double.NaN).getValue();
+    }
+
+    private Object getAverageMetricValue(ScalingMetric metric) {
+        return listener.getGauge(getAverageMetricId(metric)).get().getValue();
+    }
+
+    private String getCurrentMetricId(ScalingMetric metric) {
+        return getMetricId(metric, CURRENT);
+    }
+
+    private String getAverageMetricId(ScalingMetric metric) {
+        return getMetricId(metric, AVERAGE);
+    }
+
+    private String getMetricId(ScalingMetric metric, String classifier) {
+        return listener.getMetricId(
+                JOB_VERTEX_ID, jobVertexID.toHexString(), metric.name(), 
classifier);
+    }
+}

Reply via email to