This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 51b3752ac32 [FLINK-28488][kafka] Only forward measurable Kafka metrics and ignore others 51b3752ac32 is described below commit 51b3752ac320a45a20738fea28c8fdd923061f1c Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Wed Aug 17 18:01:34 2022 +0200 [FLINK-28488][kafka] Only forward measurable Kafka metrics and ignore others --- .../internals/metrics/KafkaMetricWrapper.java | 7 +++++- .../metrics/KafkaMetricMutableWrapperTest.java | 29 ++++++++++++++-------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java index 23617b544c9..1ab41ce9c9b 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java @@ -32,6 +32,11 @@ public class KafkaMetricWrapper implements Gauge<Double> { @Override public Double getValue() { - return (Double) kafkaMetric.metricValue(); + final Object metricValue = kafkaMetric.metricValue(); + // Previously KafkaMetric supported KafkaMetric#value that always returned a Double value. + // Since this method has been deprecated and is removed in future releases we have to + // manually check if the returned value is Double. Internally, KafkaMetric#value also + // returned 0.0 for all not "measurable" values, so we restored the original behavior. + return metricValue instanceof Double ? (Double) metricValue : 0.0; } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java index 818794ef6fa..952f0fdc411 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java @@ -18,10 +18,12 @@ package org.apache.flink.streaming.connectors.kafka.internals.metrics; +import org.apache.flink.metrics.Gauge; import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.Test; @@ -37,6 +39,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Properties; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Stream; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; import static org.apache.flink.util.DockerImageVersions.KAFKA; @@ -56,22 +60,27 @@ class KafkaMetricMutableWrapperTest { .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + @Test + void testOnlyMeasurableMetricsAreRegisteredWithMutableWrapper() { + testOnlyMeasurableMetricsAreRegistered(KafkaMetricMutableWrapper::new); + } + @Test void testOnlyMeasurableMetricsAreRegistered() { - final Collection<KafkaMetricMutableWrapper> metricWrappers = new ArrayList<>(); + testOnlyMeasurableMetricsAreRegistered(KafkaMetricWrapper::new); + } + + private static void testOnlyMeasurableMetricsAreRegistered( + Function<Metric, Gauge<Double>> wrapperFactory) { + final Collection<Gauge<Double>> metricWrappers = new ArrayList<>(); final KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(getKafkaClientConfiguration()); final KafkaProducer<?, ?> producer = new KafkaProducer<>(getKafkaClientConfiguration()); - consumer.metrics() - .forEach( - (name, metric) -> - metricWrappers.add(new KafkaMetricMutableWrapper(metric))); - producer.metrics() - .forEach( - (name, metric) -> - metricWrappers.add(new KafkaMetricMutableWrapper(metric))); + Stream.concat(consumer.metrics().values().stream(), producer.metrics().values().stream()) + .map(wrapperFactory::apply) + .forEach(metricWrappers::add); // Ensure that all values are accessible and return valid double values - metricWrappers.forEach(KafkaMetricMutableWrapper::getValue); + metricWrappers.forEach(Gauge::getValue); } private static Properties getKafkaClientConfiguration() {