lucasbru commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1013142335


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -395,8 +395,10 @@ private void createNewTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksTo
             tasks.addActiveTasks(newActiveTasks);
             tasks.addStandbyTasks(newStandbyTask);
         } else {
-            tasks.addPendingTaskToInit(newActiveTasks);
-            tasks.addPendingTaskToInit(newStandbyTask);
+            final Map<TaskId, RuntimeException> taskInitExceptions = new 
LinkedHashMap<>();
+            Stream.concat(newActiveTasks.stream(), newStandbyTask.stream())
+                    .forEach(t -> addTaskToStateUpdater(t, 
taskInitExceptions));

Review Comment:
   If we initialize inside the state updater (lazily using 
`initializeIfNeeded`, before restoration), we should be able to avoid any extra 
handling for these tasks. In `handleAssignment`, we can directly add the 
uninitialized tasks to the state updater.
   
   The advantages would be
   - No `pendingTaskToInit` required, so some simplification.
   - We don't crash when two rebalances happen in one round of polling (why I 
opened this PR, see description)
   - After a rebalance, all tasks that remained with this instance can 
immediately continue processing, and we do not have to wait for new tasks to 
initialize their rocksdb (this is more the cherry on top)
   
   The last option that I see to fix the `IllegalStateException` (besides 
moving initialization to the poll-phase or to the state updater) is that we can 
take care to not recreate tasks that are already in `pendingTaskToInit`, but it 
could get complicated. It can happen that the input partitions change, in which 
case we need to move the already created task but not initialized task to 
`pendingTaskToUpdateInputPartitions` - but at the same time remember that the 
task isn't initialized yet ... not sure exactly how to handle these things 
correctly, but it looks like it will just add more corner cases to think about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to