guozhangwang commented on code in PR #12743: URL: https://github.com/apache/kafka/pull/12743#discussion_r995080928
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -177,6 +177,14 @@ void handleRebalanceComplete() { // before then the assignment has not been updated yet. mainConsumer.pause(mainConsumer.assignment()); + if (stateUpdater != null) { + // All tasks that need restoration are now owned by the state updater. + // All tasks that are owned by the task manager are ready and can be resumed immediately. + for (final Task t : tasks.allTasks()) { + mainConsumer.resume(t.inputPartitions()); + } + } Review Comment: +1. The consumer.pause/resume calls are not completely free so it's better to batch them. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -177,6 +177,14 @@ void handleRebalanceComplete() { // before then the assignment has not been updated yet. mainConsumer.pause(mainConsumer.assignment()); + if (stateUpdater != null) { + // All tasks that need restoration are now owned by the state updater. + // All tasks that are owned by the task manager are ready and can be resumed immediately. + for (final Task t : tasks.allTasks()) { + mainConsumer.resume(t.inputPartitions()); + } + } Review Comment: Personally I prefer the second, i.e. just pause what's necessary; in the past we simply pause all since all of them need to go through a restoration phase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org