guozhangwang commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r603011034
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,55 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); Review comment: Thanks for the cleanup! I think in the past we may only mark some subset of changelog partitions as corrupted, but later we would always just mark all of them as corrupted. Just following that thought, maybe in `task.markChangelogAsCorrupted` we do not need to pass in parameters either but just mark all changelog partitions as corrupted? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,55 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()), + new HashMap<>() + ); + } catch (final TaskCorruptedException e) { + log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " + + "tasks to clean and revive: {}", e.corruptedTasks()); + corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks())); + } catch (final TimeoutException e) { + log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived"); + final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeTasks()); + uncorruptedTasks.removeAll(corruptedActiveTasks); + // Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted + closeDirtyAndRevive(uncorruptedTasks, false); Review comment: If `closeDirtyAndRevive` throws here, then the next `closeDirtyAndRevive` would not be triggered. Is that okay, or do we guarantee that `closeDirtyAndRevive` would not throw at all now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org