[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350468#comment-16350468 ]
aitozi commented on FLINK-6109: ------------------------------- [~tzulitai] you mentioned that the calculate of the lag is overwhelming, but we can fetch the kafka lag after kafka version 0.10.2 and dont need to calculate , as i mentioned in https://github.com/apache/flink/pull/4935 > Add "consumer lag" report metric to FlinkKafkaConsumer > ------------------------------------------------------ > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: aitozi > Priority: Major > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v7.6.3#76005)