chia7712 commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r580741128
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ########## @@ -2219,6 +2221,25 @@ public void resume(Collection<TopicPartition> partitions) { } } + /** + * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known, + * for example if there is no position yet, or if the end offset is not known yet. + * + * <p> + * This method uses locally cached metadata and never makes a remote call. + * + * @param topicPartition The partition to get the lag for. + * + * @return This {@code Consumer} instance's current lag for the given partition. + * + * @throws IllegalStateException if the {@code topicPartition} is not assigned + **/ + @Override + public OptionalLong currentLag(TopicPartition topicPartition) { + final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); Review comment: Other methods call `acquireAndEnsureOpen();` first and then call `release()` in the finally block. Should this new method follow same pattern? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org