guozhangwang commented on a change in pull request #8988: URL: https://github.com/apache/kafka/pull/8988#discussion_r506650384
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -622,16 +613,42 @@ void runOnce() { return; } - initializeAndRestorePhase(); + // we need to first add any closed revoked/corrupted/recycled tasks and then add the initialized tasks to update the changelogs of revived/recycled tasks + restoreThread.addClosedTasks(taskManager.drainRemovedTasks()); + + // try to initialize created tasks that are either newly assigned, recycled, or revived from corrupted tasks + final List<Task> initializedTasks = taskManager.tryInitializeNewTasks(); + if (!initializedTasks.isEmpty()) { + log.info("Initialized new tasks {} under state {}, will start restoring them", + initializedTasks.stream().map(Task::id).collect(Collectors.toList()), state); + + restoreThread.addInitializedTasks(initializedTasks); + } + + // try complete restoration if there are any restoring tasks + taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs()); Review comment: I intentionally do not make the restore thread as part of the task manager since moving forward when we have one consumer per client, instead per stream thread, I plan to only have a single restore thread instead of N threads, one per stream-thread, and hence I would like to "isolate" the thread from a specific task manager starting early. Ditto for the other comment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org