ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602815829
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,48 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commit(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()) + ); + } catch (final TaskCorruptedException e) { Review comment: Yeah, look at what I have now which is what I was doing before I backtracked and thought we wanted to kill the thread if we hit the task.timeout on a TimeoutException in commit. TimeoutException handling here should be ok now -- regarding the rest, seems like we should 1) always clear the timeout inside `Task#revive` 2) refactor `commit` a bit and pull the `prepareCommit` + `commitOffsetsORTransaction` + `postCommit` stuff out into a separate method we can invoke from `handleCorrupted` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org