lucasbru commented on code in PR #14226:
URL: https://github.com/apache/kafka/pull/14226#discussion_r1326415866


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {

Review Comment:
   Name: good idea. I wanted to use this as a "bag of things to do 
before/after" the poll, as we do with prepareCommit / postCommit, but you are 
right that it's not necessarily a good pattern, and we only have one thing in 
the bag, so let's use a specific name.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {
+        if (state() == State.RUNNING) {
+            // before running a new poll on the main consumer, if a queue's 
buffered size has been
+            // decreased to the threshold, we can then resume the consumption 
on this partition
+            final Set<TopicPartition> currentAssignment = 
mainConsumer.assignment();
+
+            mainConsumer.resume(
+                partitionGroup.partitions()
+                    .stream()
+                    .filter(currentAssignment::contains)
+                    .filter(p -> partitionGroup.numBuffered(p) <= 
maxBufferedSize)
+                    .collect(Collectors.toList())
+            );
+        }
+    }
+
+    @Override
+    public void postPoll() {

Review Comment:
   Good idea. Done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to