guozhangwang commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r603011034



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,55 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);

Review comment:
       Thanks for the cleanup! I think in the past we may only mark some subset 
of changelog partitions as corrupted, but later we would always just mark all 
of them as corrupted. Just following that thought, maybe in 
`task.markChangelogAsCorrupted` we do not need to pass in parameters either but 
just mark all changelog partitions as corrupted?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,55 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet()),
+                                new HashMap<>()
+            );
+        } catch (final TaskCorruptedException e) {
+            log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
+                         "tasks to clean and revive: {}", e.corruptedTasks());
+            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+        } catch (final TimeoutException e) {
+            log.info("Hit TimeoutException when committing all non-corrupted 
tasks, these will be closed and revived");
+            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeTasks());
+            uncorruptedTasks.removeAll(corruptedActiveTasks);
+            // Those tasks which just timed out can just be closed dirty 
without marking changelogs as corrupted
+            closeDirtyAndRevive(uncorruptedTasks, false);

Review comment:
       If `closeDirtyAndRevive` throws here, then the next 
`closeDirtyAndRevive` would not be triggered. Is that okay, or do we guarantee 
that `closeDirtyAndRevive` would not throw at all now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to