ableegoldman commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r444525997
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -679,92 +675,166 @@ private void cleanupTask(final Task task) { void shutdown(final boolean clean) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - final Set<Task> tasksToClose = new HashSet<>(); + final Set<Task> tasksToCloseDirty = new HashSet<>(); + tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException)); + tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException)); + + for (final Task task : tasksToCloseDirty) { + closeTaskDirty(task); + } + + for (final Task task : activeTaskIterable()) { + executeAndMaybeSwallow( + clean, Review comment: I think this is more in line with the general code flow elsewhere. Note that if we started out clean but became dirty and had to close some tasks as such, we would have already caught an exception somewhere. So `AtomicReference#compareAndSet` would be a no-op, and it actually doesn't matter what we pass in here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org