Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4935#discussion_r148530767
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
    @@ -245,6 +238,23 @@ public void run() {
                                if (records == null) {
                                        try {
                                                records = 
consumer.poll(pollTimeout);
    +                                           // register Kafka's very own 
metrics in Flink's metric reporters
    +                                           if (useMetrics && 
!records.isEmpty()) {
    +                                                   // register Kafka 
metrics to Flink
    +                                                   Map<MetricName, ? 
extends Metric> metrics = consumer.metrics();
    +                                                   if (metrics == null) {
    +                                                           // MapR's Kafka 
implementation returns null here.
    +                                                           
log.info("Consumer implementation does not support metrics");
    +                                                   } else {
    +                                                           // we have 
Kafka metrics, register them
    +                                                           for 
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
    --- End diff --
    
    yes, i agree with you this is not the best way to solve. what do you think 
about try to register  kafka metrics at the beginnng of the job for about 
serval times which can be  configured by `properties`, after beyond the count, 
we will not run in the loop~


---

Reply via email to