[ 
https://issues.apache.org/jira/browse/FLINK-8419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346633#comment-16346633
 ] 

ASF GitHub Bot commented on FLINK-8419:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5336#discussion_r165022738
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -92,21 +93,19 @@ public Kafka09Fetcher(
                                watermarksPunctuated,
                                processingTimeProvider,
                                autoWatermarkInterval,
    -                           userCodeClassLoader,
    +                           userCodeClassLoader.getParent(),
    +                           consumerMetricGroup,
                                useMetrics);
     
                this.deserializer = deserializer;
                this.handover = new Handover();
     
    -           final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
    -           addOffsetStateGauge(kafkaMetricGroup);
    -
                this.consumerThread = new KafkaConsumerThread(
                                LOG,
                                handover,
                                kafkaProperties,
                                unassignedPartitionsQueue,
    -                           kafkaMetricGroup,
    +                           subtaskMetricGroup, // TODO: the thread should 
expose Kafka-shipped metrics through the consumer metric group, not subtask 
metric group
    --- End diff --
    
    Because that would break compatibility of previous metrics.
    
    On the other hand, it might make sense that we also additionally register 
the Kafka-shipped metrics under `consumerMetricGroup` now, if we want to be 
able to eventually resolve this TODO in the future. What do you think?


> Kafka consumer's offset metrics are not registered for dynamically discovered 
> partitions
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8419
>                 URL: https://issues.apache.org/jira/browse/FLINK-8419
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Metrics
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.5.0, 1.4.1
>
>
> Currently, the per-partition offset metrics are registered via the 
> {{AbstractFetcher#addOffsetStateGauge}} method. That method is only ever 
> called for the initial startup partitions, and not for dynamically discovered 
> partitions.
> We should consider adding some unit tests to make sure that metrics are 
> properly registered for all partitions. That would also safeguard us from 
> accidentally removing metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to