Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 Hi, @tzulitai After i read kafkaConsumer code again, i found that the per partition kafka lag metric is register in method `FetchManagerMetrics#recordPartitionLag` But the when the client get the num equal to `max.poll.records ` at once poll, it will return the record it polls in advance left some partition haven't not been `sendFetches` to. So some partition will be lost. In test , if we just poll once , then register kafka metric , if i have many partition like about(100), some partition lag metric will be losed. So i think, with a configurable property, users can choose to when they have too many partition, and will do little harmless to the performance . Please let me know your idea ,thanks
---