cadonna commented on code in PR #12795: URL: https://github.com/apache/kafka/pull/12795#discussion_r1017702122
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -115,6 +117,7 @@ public void run() { private void runOnce() throws InterruptedException { performActionsOnTasks(); + initializeTasksIfNeeded(); Review Comment: Makes sense! ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -277,6 +294,9 @@ private void addTask(final Task task) { } } } + if (task.state() == Task.State.CREATED) { + tasksToInitialize.offer(task); + } Review Comment: With this order, you register the task's changelogs and state manager before you initialize the task. Did you verify whether that causes any side-effects? I think we also have a concurrency issue here. In case of a stateless task, we add the task to the queue of restored tasks but keep a reference to the task in the state updater for initialization. It could be that the main thread reads the stateless task from the queue of restored tasks, but the state updater thread still needs to intialize the task. So I think in `addTask()` we can only add the task to `tasksToInitialize` and initialize the task as well as register it to the changelog reader or adding it to the queue of restored tasks in a different method similar to `initializeTasksIfNeeded()` (maybe named differently) that does not hold the lock on the input queue. If we do this we can maybe also get rid of the verifications whether the state updater is enabled to avoid registering the changelogs to the changelog reader during intialization as done here: https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L215 and remove the explicit changelog registration in the state updater. Could you check this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org