cadonna commented on code in PR #14226:
URL: https://github.com/apache/kafka/pull/14226#discussion_r1324289402


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {
+        if (state() == State.RUNNING) {
+            // before running a new poll on the main consumer, if a queue's 
buffered size has been
+            // decreased to the threshold, we can then resume the consumption 
on this partition
+            final Set<TopicPartition> currentAssignment = 
mainConsumer.assignment();
+
+            mainConsumer.resume(

Review Comment:
   Why do we not do this in `DefaultTaskManager#unassignTask()`? At least for 
the new threading model. I guess this is because the consumer should not be 
accessed by the processing threads. Is this correct?
   
   Alternatively, you could record the partitions that can be resumed in 
`DefaultTaskManager#unassignTask()` and then pass those partitions to the 
polling thread.
   
   I just try to avoid to iterate at each poll over all tasks and partitions.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -370,11 +374,30 @@ void clear() {
         nonEmptyQueuesByTime.clear();
         totalBuffered = 0;
         streamTime = RecordQueue.UNKNOWN;
+        fetchedLags.clear();
     }
 
     void close() {
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.close();
         }
     }
+
+    void updateLags() {
+        for (final TopicPartition tp : partitionQueues.keySet()) {
+            final OptionalLong l = lagProvider.apply(tp);
+            if (l.isPresent()) {
+                fetchedLags.put(tp, l.getAsLong());
+                logger.trace("Updated lag for {} to {}", tp, l.getAsLong());
+            } else {
+                fetchedLags.remove(tp);
+            }
+        }
+    }
+
+    // for testing only
+    Map<TopicPartition, Long> fetchedLags() {
+        return fetchedLags;
+    }

Review Comment:
   🙁 
   Isn't there another way to test this? Exposing internal fields only for 
testing smells.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {

Review Comment:
   nit:
   Could you use a more meaningful name? Something like 
`resumePollingForPartitionsWithAvailableSpace()` or 
`resumePollingForPartitionsWithNonFullBuffers()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {

Review Comment:
   I could not find dedicated test for this method. Could you please add one? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -590,6 +590,30 @@ public void prepareRecycle() {
         log.info("Closed and recycled state");
     }
 
+    @Override
+    public void preparePoll() {
+        if (state() == State.RUNNING) {
+            // before running a new poll on the main consumer, if a queue's 
buffered size has been
+            // decreased to the threshold, we can then resume the consumption 
on this partition
+            final Set<TopicPartition> currentAssignment = 
mainConsumer.assignment();
+
+            mainConsumer.resume(
+                partitionGroup.partitions()
+                    .stream()
+                    .filter(currentAssignment::contains)
+                    .filter(p -> partitionGroup.numBuffered(p) <= 
maxBufferedSize)
+                    .collect(Collectors.toList())
+            );
+        }
+    }
+
+    @Override
+    public void postPoll() {

Review Comment:
   nit:
   What about renaming to `updateLags()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -370,11 +374,30 @@ void clear() {
         nonEmptyQueuesByTime.clear();
         totalBuffered = 0;
         streamTime = RecordQueue.UNKNOWN;
+        fetchedLags.clear();
     }
 
     void close() {
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.close();
         }
     }
+
+    void updateLags() {
+        for (final TopicPartition tp : partitionQueues.keySet()) {
+            final OptionalLong l = lagProvider.apply(tp);
+            if (l.isPresent()) {
+                fetchedLags.put(tp, l.getAsLong());
+                logger.trace("Updated lag for {} to {}", tp, l.getAsLong());
+            } else {
+                fetchedLags.remove(tp);
+            }
+        }
+    }
+
+    // for testing only
+    Map<TopicPartition, Long> fetchedLags() {
+        return fetchedLags;
+    }

Review Comment:
   🙁 
   Isn't there another way to test this? Exposing internal fields only for 
testing smells.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to