Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5335#discussion_r165022709 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -95,21 +95,19 @@ public Kafka09Fetcher( watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, - userCodeClassLoader, + userCodeClassLoader.getParent(), + consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); - final MetricGroup kafkaMetricGroup = metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP); - addOffsetStateGauge(kafkaMetricGroup); - this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, - kafkaMetricGroup, + subtaskMetricGroup, // TODO: the thread should expose Kafka-shipped metrics through the consumer metric group, not subtask metric group --- End diff -- so why aren't we passing the consumerMetricGroup here?
---