Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4935#discussion_r148530767 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -245,6 +238,23 @@ public void run() { if (records == null) { try { records = consumer.poll(pollTimeout); + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics && !records.isEmpty()) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); + } else { + // we have Kafka metrics, register them + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { --- End diff -- yes, i agree with you this is not the best way to solve. what do you think about try to register kafka metrics at the beginnng of the job for about serval times which can be configured by `properties`, after beyond the count, we will not run in the loop~
---