ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602816050
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,48 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commit(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()) + ); + } catch (final TaskCorruptedException e) { Review comment: The only semantic difference that 2) would make is to have `prepareCommit` inside the try block we would catch TimeoutException for. I assume that's ok, not sure if it was left out for a reason...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org