Valentina Predtechenskaya created FLINK-28790:
-------------------------------------------------
Summary: Incorrect KafkaProducer metrics initialization
Key: FLINK-28790
URL: https://issues.apache.org/jira/browse/FLINK-28790
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.15.1, 1.14.4
Reporter: Valentina Predtechenskaya
Problem
KafkaProducer Flink metrics have unpredictable behavior because of concurrent
initialization of broker's and topic's metrics.
Reproducing
Firstly we found the problem with our Flink cluster: metric
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or
near zero) on several subtasks, in the same time other subtasks was fine with
this metric. Actual outgoing rate was the same on different subtasks, it was
clear from, for example, KafkaProducer.records-send-rate metric, which was ok
on every subtask, problem 100% was with metric itself.
After long investigation we found the root-cause of this behavior:
* KafkaWriter creates an instance of FlinkKafkaInternalProducer and then
[initializes|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330]
metric wrappers over existing KafkaProducer metrics (gauges)
* KafkaProducer itself in the constructor [creates
Sender|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460]
to access brokers, starts a thread (kafka-producer-network-thread) and run
Sender in this separate thread
* After starting the Sender, metrics connected with topics and brokers register
for some time. If they register quickly, KafkaWriter will see them before the
end of initialization and these metrics will be wrapped as flink gauges.
Otherwise, they will not.
* [Some KafkaProducer
metrics|https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics]
from producer and from broker has same names - for example, outgoing-byte-rate
* In case if two metrics has same name, Flink KafkaWriter
[rewrites|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360]
metric in wrapper
So, to reproduce this bug it's enough to run any job with Kafka Sink and to
look at the KafkaProducer metrics, some of them will be absent (broker's or
topic's) or some of them will be rewritten (like outgoing-byte-rate in the
example).
I suppose there is at least two ways to fix it:
1. Add tag (producer-metric, producer-node-metric, etc.) to Flinks metrics name
2. Use only metrics with tag=producer-metrics, ignore any another tags -
without considering broker's and topic's metrics
--
This message was sent by Atlassian Jira
(v8.20.10#820010)