Hi, I suggest you creating a ticket for it on 
https://issues.apache.org/jira/projects/FLINK/summary.
Flink - ASF JIRA<https://issues.apache.org/jira/projects/FLINK/summary>
Welcome to the Apache Flink project. Apache Flink is an open source platform 
for scalable batch and stream data processing.
issues.apache.org


Best,
Zhanghao Chen
________________________________
From: Valentina Predtechenskaya <vgpredtechensk...@avito.ru>
Sent: Wednesday, August 3, 2022 1:32
To: user@flink.apache.org <user@flink.apache.org>
Subject: (Possible) bug in flink-kafka-connector (metrics rewriting)



Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !


________________________________
“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”

Reply via email to