lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245427580


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1138,28 +1138,33 @@ public void signalResume() {
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        // Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-        // so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-        // just have an empty changelogOffsets map.
-        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-            final Task task = tasks.contains(id) ? tasks.task(id) : null;
-            // Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-            if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+        final Map<TaskId, Task> tasks = allTasks();
+        final Set<TaskId> 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Ah, I recommended this change thinking that `lockedTaskDirectories` always 
includes all `ClosedAndCreatedTasks` -- I think it does right? So it should be 
enough to assign this to `lockedTaskDirectories`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to