[ https://issues.apache.org/jira/browse/FLINK-8419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346633#comment-16346633 ]
ASF GitHub Bot commented on FLINK-8419: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5336#discussion_r165022738 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -92,21 +93,19 @@ public Kafka09Fetcher( watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, - userCodeClassLoader, + userCodeClassLoader.getParent(), + consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); - final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); - addOffsetStateGauge(kafkaMetricGroup); - this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, - kafkaMetricGroup, + subtaskMetricGroup, // TODO: the thread should expose Kafka-shipped metrics through the consumer metric group, not subtask metric group --- End diff -- Because that would break compatibility of previous metrics. On the other hand, it might make sense that we also additionally register the Kafka-shipped metrics under `consumerMetricGroup` now, if we want to be able to eventually resolve this TODO in the future. What do you think? > Kafka consumer's offset metrics are not registered for dynamically discovered > partitions > ---------------------------------------------------------------------------------------- > > Key: FLINK-8419 > URL: https://issues.apache.org/jira/browse/FLINK-8419 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Metrics > Affects Versions: 1.4.0, 1.5.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Currently, the per-partition offset metrics are registered via the > {{AbstractFetcher#addOffsetStateGauge}} method. That method is only ever > called for the initial startup partitions, and not for dynamically discovered > partitions. > We should consider adding some unit tests to make sure that metrics are > properly registered for all partitions. That would also safeguard us from > accidentally removing metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)