ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602815829



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,48 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commit(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet())
+            );
+        } catch (final TaskCorruptedException e) {

Review comment:
       Yeah, look at what I have now which is what I was doing before I 
backtracked and thought we wanted to kill the thread if we hit the task.timeout 
on a TimeoutException in commit. TimeoutException handling here should be ok 
now -- regarding the rest, seems like we should
   1) always clear the timeout inside `Task#revive`
   2) refactor `commit` a bit and pull the `prepareCommit` + 
`commitOffsetsORTransaction` + `postCommit` stuff out into a separate method we 
can invoke from `handleCorrupted`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to