cadonna commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1017702122


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -115,6 +117,7 @@ public void run() {
 
         private void runOnce() throws InterruptedException {
             performActionsOnTasks();
+            initializeTasksIfNeeded();

Review Comment:
   Makes sense!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -277,6 +294,9 @@ private void addTask(final Task task) {
                     }
                 }
             }
+            if (task.state() == Task.State.CREATED) {
+                tasksToInitialize.offer(task);
+            }

Review Comment:
   With this order, you register the task's changelogs and state manager before 
you initialize the task. Did you verify whether that causes any side-effects?
   
   I think we also have a concurrency issue here. In case of a stateless task, 
we add the task to the queue of restored tasks but keep a reference to the task 
in the state updater for initialization. It could be that the main thread reads 
the stateless task from the queue of restored tasks, but the state updater 
thread still needs to intialize the task.
   
   So I think in `addTask()` we can only add the task to `tasksToInitialize` 
and initialize the task as well as register it to the changelog reader or 
adding it to the queue of restored tasks in a different method similar to 
`initializeTasksIfNeeded()` (maybe named differently) that does not hold the 
lock on the input queue. 
   
   If we do this we can maybe also get rid of the verifications whether the 
state updater is enabled to avoid registering the changelogs to the changelog 
reader during intialization as done here:
   
https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L215
   and remove the explicit changelog registration in the state updater. Could 
you check this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to