ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602814224
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,48 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commit(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()) + ); + } catch (final TaskCorruptedException e) { + log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " + + "tasks to clean and revive: {}", e.corruptedTasks()); + corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks())); + } - closeAndRevive(corruptedActiveTasks); + closeDirtyAndRevive(corruptedActiveTasks, true); } - private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWithChangelogs) { - for (final Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) { - final Task task = entry.getKey(); + private void closeDirtyAndRevive(final Collection<Task> dirtyTasks, final boolean markAsCorrupted) { + for (final Task task : dirtyTasks) { + final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); // mark corrupted partitions to not be checkpointed, and then close the task as dirty - final Collection<TopicPartition> corruptedPartitions = entry.getValue(); - task.markChangelogAsCorrupted(corruptedPartitions); + if (markAsCorrupted) { Review comment: This was your suggestion? Or maybe your question is directed at something else..? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org