ableegoldman commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r738872895



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       Note that we're currently going through only the _existing_ tasks -- but 
we want to commit only if there are _new_ tasks which will need restoring. 
   
   Unfortunately due to the task lifecycle, specifically that all tasks pass 
through the `RESTORING` phase before going into `RUNNING`, it's actually 
nontrivial to figure out if we're going to need to actually spend any time 
restoring new tasks. As a first pass, for now (so we can get some kind of fix 
into 3.1), we can just set `commitAssignedActiveTasks = true` if there are any 
newly added active tasks at all. 
   
   I think that's fine for a first pass, but if you're interested in how we 
could check if any of the new active tasks actually need restoring, check out 
the `StoreChangelogReader` class. It will always do at least a single first 
pass to confirm that any new active tasks are all caught up, so one idea would 
be to just check if there are any changelogs left that need to be restored from 
after doing this first pass. Actually, you may not even need to worry about 
this "first pass",  you can just blindly do the commit if there are any active 
tasks that need to be committed and there are some tasks still in RESTORING at 
the end of TaskManager#tryToCompleteRestoration. Does that make sense? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to