guozhangwang commented on code in PR #12743:
URL: https://github.com/apache/kafka/pull/12743#discussion_r995080928


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -177,6 +177,14 @@ void handleRebalanceComplete() {
         // before then the assignment has not been updated yet.
         mainConsumer.pause(mainConsumer.assignment());
 
+        if (stateUpdater != null) {
+            // All tasks that need restoration are now owned by the state 
updater.
+            // All tasks that are owned by the task manager are ready and can 
be resumed immediately.
+            for (final Task t : tasks.allTasks()) {
+                mainConsumer.resume(t.inputPartitions());
+            }
+        }

Review Comment:
   +1. The consumer.pause/resume calls are not completely free so it's better 
to batch them.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -177,6 +177,14 @@ void handleRebalanceComplete() {
         // before then the assignment has not been updated yet.
         mainConsumer.pause(mainConsumer.assignment());
 
+        if (stateUpdater != null) {
+            // All tasks that need restoration are now owned by the state 
updater.
+            // All tasks that are owned by the task manager are ready and can 
be resumed immediately.
+            for (final Task t : tasks.allTasks()) {
+                mainConsumer.resume(t.inputPartitions());
+            }
+        }

Review Comment:
   Personally I prefer the second, i.e. just pause what's necessary; in the 
past we simply pause all since all of them need to go through a restoration 
phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to