Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5335#discussion_r165022709
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -95,21 +95,19 @@ public Kafka09Fetcher(
                                watermarksPunctuated,
                                processingTimeProvider,
                                autoWatermarkInterval,
    -                           userCodeClassLoader,
    +                           userCodeClassLoader.getParent(),
    +                           consumerMetricGroup,
                                useMetrics);
     
                this.deserializer = deserializer;
                this.handover = new Handover();
     
    -           final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP);
    -           addOffsetStateGauge(kafkaMetricGroup);
    -
                this.consumerThread = new KafkaConsumerThread(
                                LOG,
                                handover,
                                kafkaProperties,
                                unassignedPartitionsQueue,
    -                           kafkaMetricGroup,
    +                           subtaskMetricGroup, // TODO: the thread should 
expose Kafka-shipped metrics through the consumer metric group, not subtask 
metric group
    --- End diff --
    
    so why aren't we passing the consumerMetricGroup here?


---

Reply via email to