Hi, I suggest you creating a ticket for it on https://issues.apache.org/jira/projects/FLINK/summary. Flink - ASF JIRA<https://issues.apache.org/jira/projects/FLINK/summary> Welcome to the Apache Flink project. Apache Flink is an open source platform for scalable batch and stream data processing. issues.apache.org
Best, Zhanghao Chen ________________________________ From: Valentina Predtechenskaya <vgpredtechensk...@avito.ru> Sent: Wednesday, August 3, 2022 1:32 To: user@flink.apache.org <user@flink.apache.org> Subject: (Possible) bug in flink-kafka-connector (metrics rewriting) Hello ! I would like to report a bug with metrics registration on KafkaProducer initialization. Firstly we found the problem with our Flink cluster: metric KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or near zero) on several subtasks, in the same time other subtasks was fine with this metric. Actual outgoing rate was the same on different subtasks, it was clear from, for example, KafkaProducer.records-send-rate metric, which was ok on every subtask, problem 100% was with metric itself. After long investigation we found the root-cause of this behavior: * KafkaWriter creates an instance of FlinkKafkaInternalProducer and then initializes metric wrappers over existing KafkaProducer metrics (gauges) - https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330 * KafkaProducer itself in the constructor creates Sender to access brokers, starts a thread (kafka-producer-network-thread) and run Sender in this separate thread - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460 * After starting the Sender, metrics connected with topics and brokers register for some time. If they register quickly, KafkaWriter will see them before the end of initialization and these metrics will be wrapped as flink gauges. Otherwise, they will not. * Some KafkaProducer metrics from producer and from broker has same names - for example, outgoing-byte-rate - https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics * In case if two metrics has same name, Flink KafkaWriter rewrites metric in wrapper - https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360 I have debugged these libraries a lot and I'm sure in that behavior. If, for example, patch flink-kafka-connector with condition not to initialize metric if "producer-node-metrics".equals(metric.metricName().group()), our metrics all fine (outgoing-byte-rate is not 0). Also, the bug does not reproduce if cluster is not very fast (for example, on local machine) and data from brokers comes only when all metrics initialized in KafkaWriter. I suppose this is not an expected behavior, but even in the last version of flink-kafka-connector code is the same. Is there any treatement ? Maybe some workarounds ? To be honest, I don't really want to use my own patched version of connector. Thank you ! ________________________________ “This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”