lucasbru commented on code in PR #14226:
URL: https://github.com/apache/kafka/pull/14226#discussion_r1326421083


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {
+        if (state() == State.RUNNING) {
+            // before running a new poll on the main consumer, if a queue's 
buffered size has been
+            // decreased to the threshold, we can then resume the consumption 
on this partition
+            final Set<TopicPartition> currentAssignment = 
mainConsumer.assignment();
+
+            mainConsumer.resume(

Review Comment:
   Good point, I improved the implementation by adding some internal state to 
`StreamTask`, that stores the partitions that need to be resumed. That actually 
simplifies the code so it's probably the right thing to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to