lucasbru commented on code in PR #14226: URL: https://github.com/apache/kafka/pull/14226#discussion_r1326423158
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java: ########## @@ -370,11 +374,30 @@ void clear() { nonEmptyQueuesByTime.clear(); totalBuffered = 0; streamTime = RecordQueue.UNKNOWN; + fetchedLags.clear(); } void close() { for (final RecordQueue queue : partitionQueues.values()) { queue.close(); } } + + void updateLags() { + for (final TopicPartition tp : partitionQueues.keySet()) { + final OptionalLong l = lagProvider.apply(tp); + if (l.isPresent()) { + fetchedLags.put(tp, l.getAsLong()); + logger.trace("Updated lag for {} to {}", tp, l.getAsLong()); + } else { + fetchedLags.remove(tp); + } + } + } + + // for testing only + Map<TopicPartition, Long> fetchedLags() { + return fetchedLags; + } Review Comment: The problem is that we only get a boolean returned from `readyForProcessing` so it's a bit limited what you can test from the outside without this helper method. I eliminated the helper method by checking for the `TRACE` level log messages produced by `readyForProcessing`, which is slightly less smelly than the helper method, I would say, and is also already used in other test methods of the same class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org