This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 51b3752ac32 [FLINK-28488][kafka] Only forward measurable Kafka metrics 
and ignore others
51b3752ac32 is described below

commit 51b3752ac320a45a20738fea28c8fdd923061f1c
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Aug 17 18:01:34 2022 +0200

    [FLINK-28488][kafka] Only forward measurable Kafka metrics and ignore others
---
 .../internals/metrics/KafkaMetricWrapper.java      |  7 +++++-
 .../metrics/KafkaMetricMutableWrapperTest.java     | 29 ++++++++++++++--------
 2 files changed, 25 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
index 23617b544c9..1ab41ce9c9b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -32,6 +32,11 @@ public class KafkaMetricWrapper implements Gauge<Double> {
 
     @Override
     public Double getValue() {
-        return (Double) kafkaMetric.metricValue();
+        final Object metricValue = kafkaMetric.metricValue();
+        // Previously KafkaMetric supported KafkaMetric#value that always 
returned a Double value.
+        // Since this method has been deprecated and is removed in future 
releases we have to
+        // manually check if the returned value is Double. Internally, 
KafkaMetric#value also
+        // returned 0.0 for all not "measurable" values, so we restored the 
original behavior.
+        return metricValue instanceof Double ? (Double) metricValue : 0.0;
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
index 818794ef6fa..952f0fdc411 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals.metrics;
 
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.Test;
@@ -37,6 +39,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
@@ -56,22 +60,27 @@ class KafkaMetricMutableWrapperTest {
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
+    @Test
+    void testOnlyMeasurableMetricsAreRegisteredWithMutableWrapper() {
+        testOnlyMeasurableMetricsAreRegistered(KafkaMetricMutableWrapper::new);
+    }
+
     @Test
     void testOnlyMeasurableMetricsAreRegistered() {
-        final Collection<KafkaMetricMutableWrapper> metricWrappers = new 
ArrayList<>();
+        testOnlyMeasurableMetricsAreRegistered(KafkaMetricWrapper::new);
+    }
+
+    private static void testOnlyMeasurableMetricsAreRegistered(
+            Function<Metric, Gauge<Double>> wrapperFactory) {
+        final Collection<Gauge<Double>> metricWrappers = new ArrayList<>();
         final KafkaConsumer<?, ?> consumer = new 
KafkaConsumer<>(getKafkaClientConfiguration());
         final KafkaProducer<?, ?> producer = new 
KafkaProducer<>(getKafkaClientConfiguration());
-        consumer.metrics()
-                .forEach(
-                        (name, metric) ->
-                                metricWrappers.add(new 
KafkaMetricMutableWrapper(metric)));
-        producer.metrics()
-                .forEach(
-                        (name, metric) ->
-                                metricWrappers.add(new 
KafkaMetricMutableWrapper(metric)));
+        Stream.concat(consumer.metrics().values().stream(), 
producer.metrics().values().stream())
+                .map(wrapperFactory::apply)
+                .forEach(metricWrappers::add);
 
         // Ensure that all values are accessible and return valid double values
-        metricWrappers.forEach(KafkaMetricMutableWrapper::getValue);
+        metricWrappers.forEach(Gauge::getValue);
     }
 
     private static Properties getKafkaClientConfiguration() {

Reply via email to