chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r580741128



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2219,6 +2221,25 @@ public void resume(Collection<TopicPartition> 
partitions) {
         }
     }
 
+    /**
+     * Get the consumer's current lag on the partition. Returns an "empty" 
{@link OptionalLong} if the lag is not known,
+     * for example if there is no position yet, or if the end offset is not 
known yet.
+     *
+     * <p>
+     * This method uses locally cached metadata and never makes a remote call.
+     *
+     * @param topicPartition The partition to get the lag for.
+     *
+     * @return This {@code Consumer} instance's current lag for the given 
partition.
+     *
+     * @throws IllegalStateException if the {@code topicPartition} is not 
assigned
+     **/
+    @Override
+    public OptionalLong currentLag(TopicPartition topicPartition) {
+        final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);

Review comment:
       Other methods call `acquireAndEnsureOpen();` first and then call 
`release()` in the finally block. Should this new method follow same pattern?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to