guozhangwang commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r580782275
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ########## @@ -243,6 +244,11 @@ */ Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout); + /** + * @see KafkaConsumer#currentLag(TopicPartition) + */ + OptionalLong currentLag(TopicPartition topicPartition); Review comment: For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) { final TopicPartition partition = entry.getKey(); final RecordQueue queue = entry.getValue(); - final Long nullableFetchedLag = fetchedLags.get(partition); + final OptionalLong fetchedLag = lagProvider.apply(partition); Review comment: Wearing my paranoid hat here: `readyToProcess` is on the critical path, called per record, while we would only update the underlying lag at most as frequent as the consumer poll rate. And in practice we would fall in to the first condition `!queue.isEmpty()` most of the time. On the other hand, `partitionLag` call on `SubscriptionState` is synchronized and could slow down the fetching thread (well, maybe just a bit). So could we call the provider only necessary, i.e. the queue is empty and the lag is either == 0 or not present? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org