lucasbru commented on code in PR #14226: URL: https://github.com/apache/kafka/pull/14226#discussion_r1326421083
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -590,6 +590,30 @@ public void prepareRecycle() { log.info("Closed and recycled state"); } + @Override + public void preparePoll() { + if (state() == State.RUNNING) { + // before running a new poll on the main consumer, if a queue's buffered size has been + // decreased to the threshold, we can then resume the consumption on this partition + final Set<TopicPartition> currentAssignment = mainConsumer.assignment(); + + mainConsumer.resume( Review Comment: Good point, I improved the implementation by adding some internal state to `StreamTask`, that stores the partitions that need to be resumed. That actually simplifies the code so it's probably the right thing to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org