ableegoldman commented on a change in pull request #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r738872895
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, final Set<Task> tasksToRecycle = new TreeSet<>(byId); final Set<Task> tasksToCloseClean = new TreeSet<>(byId); final Set<Task> tasksToCloseDirty = new TreeSet<>(byId); + boolean commitAssignedActiveTasks = false; + final Set<Task> activeTasksNeedCommit = new HashSet<>(); // first rectify all existing tasks for (final Task task : tasks.allTasks()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id())); activeTasksToCreate.remove(task.id()); + if (task.state() == State.RESTORING) { Review comment: Note that we're currently going through only the _existing_ tasks -- but we want to commit only if there are _new_ tasks which will need restoring. Unfortunately due to the task lifecycle, specifically that all tasks pass through the `RESTORING` phase before going into `RUNNING`, it's actually nontrivial to figure out if we're going to need to actually spend any time restoring new tasks. As a first pass, for now (so we can get some kind of fix into 3.1), we can just set `commitAssignedActiveTasks = true` if there are any newly added active tasks at all. I think that's fine for a first pass, but if you're interested in how we could check if any of the new active tasks actually need restoring, check out the `StoreChangelogReader` class. It will always do at least a single first pass to confirm that any new active tasks are all caught up, so one idea would be to just check if there are any changelogs left that need to be restored from after doing this first pass. Actually, you may not even need to worry about this "first pass", you can just blindly do the commit if there are any active tasks that need to be committed and there are some tasks still in RESTORING at the end of TaskManager#tryToCompleteRestoration. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org