cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246556477
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1138,28 +1140,35 @@ public void signalResume() { public Map<TaskId, Long> getTaskOffsetSums() { final Map<TaskId, Long> taskOffsetSums = new HashMap<>(); - // Not all tasks will create directories, and there may be directories for tasks we don't currently own, - // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should - // just have an empty changelogOffsets map. - for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { - final Task task = tasks.contains(id) ? tasks.task(id) : null; - // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint - if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { + final Map<TaskId, Task> tasks = allTasks(); + final Set<TaskId> createdAndClosedTasks = new HashSet<>(); + for (final Task task : tasks.values()) { + if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { - log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); + log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", + task.id()); } else { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets)); + taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } } else { - final File checkpointFile = stateDirectory.checkpointFileFor(id); - try { - if (checkpointFile.exists()) { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); - } - } catch (final IOException e) { - log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); + createdAndClosedTasks.add(task.id()); + } + } + + // Not all tasks will create directories, and there may be directories for tasks we don't currently own, + // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should + // just have an empty changelogOffsets map. + final Set<TaskId> lockedTaskDirectoriesOfNonOwnedTasks = new HashSet<>(lockedTaskDirectories); + lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet()); + for (final TaskId id : union(HashSet::new, lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) { + final File checkpointFile = stateDirectory.checkpointFileFor(id); + try { + if (checkpointFile.exists()) { + taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); } + } catch (final IOException e) { + log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); } } Review Comment: This is the fix from https://github.com/apache/kafka/pull/13925. This will disappear once the fix is merged and this PR rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org