Valentina Predtechenskaya created FLINK-28790:
-------------------------------------------------

             Summary: Incorrect KafkaProducer metrics initialization
                 Key: FLINK-28790
                 URL: https://issues.apache.org/jira/browse/FLINK-28790
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.15.1, 1.14.4
            Reporter: Valentina Predtechenskaya


Problem

KafkaProducer Flink metrics have unpredictable behavior because of concurrent 
initialization of broker's and topic's metrics.

Reproducing

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.

After long investigation we found the root-cause of this behavior:
 
* KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
[initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330]
 metric wrappers over existing KafkaProducer metrics (gauges)
* KafkaProducer itself in the constructor [creates 
Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460]
 to access brokers, starts a thread (kafka-producer-network-thread) and run 
Sender in this separate thread
* After starting the Sender, metrics connected with topics and brokers register 
for some time. If they register quickly, KafkaWriter will see them before the 
end of initialization and these metrics will be wrapped as flink gauges. 
Otherwise, they will not.
* [Some KafkaProducer 
metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics]
 from producer and from broker has same names - for example, outgoing-byte-rate
* In case if two metrics has same name, Flink KafkaWriter 
[rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360]
 metric in wrapper

So, to reproduce this bug it's enough to run any job with Kafka Sink and to 
look at the KafkaProducer metrics, some of them will be absent (broker's or 
topic's) or some of them will be rewritten (like outgoing-byte-rate in the 
example).

I suppose there is at least two ways to fix it:
1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics name
2. Use only metrics with tag=producer-metrics, ignore any another tags - 
without considering broker's and topic's metrics

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to