Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4935#discussion_r148442662
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
    @@ -245,6 +238,23 @@ public void run() {
                                if (records == null) {
                                        try {
                                                records = 
consumer.poll(pollTimeout);
    +                                           // register Kafka's very own 
metrics in Flink's metric reporters
    +                                           if (useMetrics && 
!records.isEmpty()) {
    +                                                   // register Kafka 
metrics to Flink
    +                                                   Map<MetricName, ? 
extends Metric> metrics = consumer.metrics();
    +                                                   if (metrics == null) {
    +                                                           // MapR's Kafka 
implementation returns null here.
    +                                                           
log.info("Consumer implementation does not support metrics");
    +                                                   } else {
    +                                                           // we have 
Kafka metrics, register them
    +                                                           for 
(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
    --- End diff --
    
    I'm really not sure about this.
    This includes a loop through every consumer metric on every record poll.
    AFAIK, the Kafka consumer contains at least 6~8 shipped metrics. That could 
be harmful for the performance of the consumer.
    
    Is there any way we can avoid that?


---

Reply via email to