chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583423738



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -156,50 +134,53 @@ public boolean readyToProcess(final long wallClockTime) {
             final TopicPartition partition = entry.getKey();
             final RecordQueue queue = entry.getValue();
 
-            final Long nullableFetchedLag = fetchedLags.get(partition);
 
             if (!queue.isEmpty()) {
                 // this partition is ready for processing
                 idlePartitionDeadlines.remove(partition);
                 queued.add(partition);
-            } else if (nullableFetchedLag == null) {
-                // must wait to fetch metadata for the partition
-                idlePartitionDeadlines.remove(partition);
-                logger.trace("Waiting to fetch data for {}", partition);
-                return false;
-            } else if (nullableFetchedLag > 0L) {
-                // must wait to poll the data we know to be on the broker
-                idlePartitionDeadlines.remove(partition);
-                logger.trace(
-                    "Lag for {} is currently {}, but no data is buffered 
locally. Waiting to buffer some records.",
-                    partition,
-                    nullableFetchedLag
-                );
-                return false;
             } else {
-                // p is known to have zero lag. wait for maxTaskIdleMs to see 
if more data shows up.
-                // One alternative would be to set the deadline to 
nullableMetadata.receivedTimestamp + maxTaskIdleMs
-                // instead. That way, we would start the idling timer as of 
the freshness of our knowledge about the zero
-                // lag instead of when we happen to run this method, but 
realistically it's probably a small difference
-                // and using wall clock time seems more intuitive for users,
-                // since the log message will be as of wallClockTime.
-                idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + 
maxTaskIdleMs);
-                final long deadline = idlePartitionDeadlines.get(partition);
-                if (wallClockTime < deadline) {
+                final OptionalLong fetchedLag = lagProvider.apply(partition);

Review comment:
       It was nullable before (the partition has not been fetched). For this 
net API, getting lag for such partition can produce exception. Is this a 
potential bug?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to