lucasbru commented on code in PR #14226:
URL: https://github.com/apache/kafka/pull/14226#discussion_r1326423158


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -370,11 +374,30 @@ void clear() {
         nonEmptyQueuesByTime.clear();
         totalBuffered = 0;
         streamTime = RecordQueue.UNKNOWN;
+        fetchedLags.clear();
     }
 
     void close() {
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.close();
         }
     }
+
+    void updateLags() {
+        for (final TopicPartition tp : partitionQueues.keySet()) {
+            final OptionalLong l = lagProvider.apply(tp);
+            if (l.isPresent()) {
+                fetchedLags.put(tp, l.getAsLong());
+                logger.trace("Updated lag for {} to {}", tp, l.getAsLong());
+            } else {
+                fetchedLags.remove(tp);
+            }
+        }
+    }
+
+    // for testing only
+    Map<TopicPartition, Long> fetchedLags() {
+        return fetchedLags;
+    }

Review Comment:
   The problem is that we only get a boolean returned from `readyForProcessing` 
so it's a bit limited what you can test from the outside without this helper 
method. I eliminated the helper method by checking for the `TRACE` level log 
messages produced by `readyForProcessing`, which is slightly less smelly than 
the helper method, I would say, and is also already used in other test methods 
of the same class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to