Valentina Predtechenskaya created FLINK-28790: -------------------------------------------------
Summary: Incorrect KafkaProducer metrics initialization Key: FLINK-28790 URL: https://issues.apache.org/jira/browse/FLINK-28790 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.15.1, 1.14.4 Reporter: Valentina Predtechenskaya Problem KafkaProducer Flink metrics have unpredictable behavior because of concurrent initialization of broker's and topic's metrics. Reproducing Firstly we found the problem with our Flink cluster: metric KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or near zero) on several subtasks, in the same time other subtasks was fine with this metric. Actual outgoing rate was the same on different subtasks, it was clear from, for example, KafkaProducer.records-send-rate metric, which was ok on every subtask, problem 100% was with metric itself. After long investigation we found the root-cause of this behavior: * KafkaWriter creates an instance of FlinkKafkaInternalProducer and then [initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330] metric wrappers over existing KafkaProducer metrics (gauges) * KafkaProducer itself in the constructor [creates Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460] to access brokers, starts a thread (kafka-producer-network-thread) and run Sender in this separate thread * After starting the Sender, metrics connected with topics and brokers register for some time. If they register quickly, KafkaWriter will see them before the end of initialization and these metrics will be wrapped as flink gauges. Otherwise, they will not. * [Some KafkaProducer metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics] from producer and from broker has same names - for example, outgoing-byte-rate * In case if two metrics has same name, Flink KafkaWriter [rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360] metric in wrapper So, to reproduce this bug it's enough to run any job with Kafka Sink and to look at the KafkaProducer metrics, some of them will be absent (broker's or topic's) or some of them will be rewritten (like outgoing-byte-rate in the example). I suppose there is at least two ways to fix it: 1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics name 2. Use only metrics with tag=producer-metrics, ignore any another tags - without considering broker's and topic's metrics -- This message was sent by Atlassian Jira (v8.20.10#820010)