ableegoldman commented on a change in pull request #11787: URL: https://github.com/apache/kafka/pull/11787#discussion_r812573407
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java ########## @@ -63,8 +68,21 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina int process(final int maxNumRecords, final Time time) { int totalProcessed = 0; - for (final Task task : tasks.activeTasks()) { - totalProcessed += processTask(task, maxNumRecords, time); + for (final Map.Entry<String, Set<StreamTask>> topologyEntry : tasks.activeTasksByTopology().entrySet()) { Review comment: Guess this was a case of premature optimization -- I'll update with your suggestion for this PR and work out a better solution that doesn't skew processing for the later PR where this matters (for some context, I did this in part because in one of the followups we will back off entire named topologies when one task is failing recurringly , to avoid getting out of sync, in which case it seemed wasteful to check each task in the topology if we already know it's not ready to process. But we can revisit this when we get to that PR 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org