guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r506650384



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -622,16 +613,42 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add any closed revoked/corrupted/recycled tasks 
and then add the initialized tasks to update the changelogs of revived/recycled 
tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned, 
recycled, or revived from corrupted tasks
+        final List<Task> initializedTasks = 
taskManager.tryInitializeNewTasks();
+        if (!initializedTasks.isEmpty()) {
+            log.info("Initialized new tasks {} under state {}, will start 
restoring them",
+                    
initializedTasks.stream().map(Task::id).collect(Collectors.toList()), state);
+
+            restoreThread.addInitializedTasks(initializedTasks);
+        }
+
+        // try complete restoration if there are any restoring tasks
+        
taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs());

Review comment:
       I intentionally do not make the restore thread as part of the task 
manager since moving forward when we have one consumer per client, instead per 
stream thread, I plan to only have a single restore thread instead of N 
threads, one per stream-thread, and hence I would like to "isolate" the thread 
from a specific task manager starting early. Ditto for the other comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to