ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602814224



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,48 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commit(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet())
+            );
+        } catch (final TaskCorruptedException e) {
+            log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
+                         "tasks to clean and revive: {}", e.corruptedTasks());
+            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+        }
 
-        closeAndRevive(corruptedActiveTasks);
+        closeDirtyAndRevive(corruptedActiveTasks, true);
     }
 
-    private void closeAndRevive(final Map<Task, Collection<TopicPartition>> 
taskWithChangelogs) {
-        for (final Map.Entry<Task, Collection<TopicPartition>> entry : 
taskWithChangelogs.entrySet()) {
-            final Task task = entry.getKey();
+    private void closeDirtyAndRevive(final Collection<Task> dirtyTasks, final 
boolean markAsCorrupted) {
+        for (final Task task : dirtyTasks) {
+            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
             // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            final Collection<TopicPartition> corruptedPartitions = 
entry.getValue();
-            task.markChangelogAsCorrupted(corruptedPartitions);
+            if (markAsCorrupted) {

Review comment:
       This was your suggestion? Or maybe your question is directed at 
something else..?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to