ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812573407



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -63,8 +68,21 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode 
processingMode, fina
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
 
-        for (final Task task : tasks.activeTasks()) {
-            totalProcessed += processTask(task, maxNumRecords, time);
+        for (final Map.Entry<String, Set<StreamTask>> topologyEntry : 
tasks.activeTasksByTopology().entrySet()) {

Review comment:
       Guess this was a case of premature optimization -- I'll update with your 
suggestion for this PR and work out a better solution that doesn't skew 
processing for the later PR where this matters (for some context, I did this in 
part because in one of the followups we will back off entire named topologies 
when one task is failing recurringly , to avoid getting out of sync, in which 
case it seemed wasteful to check each task in the topology if we already know 
it's not ready to process.
   
   But we can revisit this when we get to that PR 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to