chia7712 commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r583423738
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ########## @@ -156,50 +134,53 @@ public boolean readyToProcess(final long wallClockTime) { final TopicPartition partition = entry.getKey(); final RecordQueue queue = entry.getValue(); - final Long nullableFetchedLag = fetchedLags.get(partition); if (!queue.isEmpty()) { // this partition is ready for processing idlePartitionDeadlines.remove(partition); queued.add(partition); - } else if (nullableFetchedLag == null) { - // must wait to fetch metadata for the partition - idlePartitionDeadlines.remove(partition); - logger.trace("Waiting to fetch data for {}", partition); - return false; - } else if (nullableFetchedLag > 0L) { - // must wait to poll the data we know to be on the broker - idlePartitionDeadlines.remove(partition); - logger.trace( - "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", - partition, - nullableFetchedLag - ); - return false; } else { - // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up. - // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs - // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero - // lag instead of when we happen to run this method, but realistically it's probably a small difference - // and using wall clock time seems more intuitive for users, - // since the log message will be as of wallClockTime. - idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs); - final long deadline = idlePartitionDeadlines.get(partition); - if (wallClockTime < deadline) { + final OptionalLong fetchedLag = lagProvider.apply(partition); Review comment: It was nullable before (the partition has not been fetched). For this net API, getting lag for such partition can produce exception. Is this a potential bug? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org