mjsax commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602797242



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<TaskId> dirtyTaskIds = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, just handle the cleanup for 
those corrupted tasks right here
+            dirtyTaskIds.addAll(e.corruptedTasks());
+            closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException we can just close dirty and revive 
without wiping the state

Review comment:
       I guess, the underlying reasons is, that a TimeoutException could only 
happen for ALOS, while for EOS any timeout would be converted into 
TaskCorrupted? Might be worth to clarify in the comment.
   
   (Maybe also add a comment to the TaskCorrupted catch block above: `` // EOS 
case``

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1027,6 +1064,10 @@ int maybeCommitActiveTasksPerUserRequested() {
         }
     }
 
+    /**
+     * @throws TimeoutException        if task.timeout.ms has been exceeded 
(non-EOS)

Review comment:
       I think we throw `Timeout` if `commitSync` hits a timeout (ie, just 
re-throw) independently of `task.timeout.ms`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,48 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commit(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet())
+            );
+        } catch (final TaskCorruptedException e) {
+            log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
+                         "tasks to clean and revive: {}", e.corruptedTasks());
+            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+        }
 
-        closeAndRevive(corruptedActiveTasks);
+        closeDirtyAndRevive(corruptedActiveTasks, true);
     }
 
-    private void closeAndRevive(final Map<Task, Collection<TopicPartition>> 
taskWithChangelogs) {
-        for (final Map.Entry<Task, Collection<TopicPartition>> entry : 
taskWithChangelogs.entrySet()) {
-            final Task task = entry.getKey();
+    private void closeDirtyAndRevive(final Collection<Task> dirtyTasks, final 
boolean markAsCorrupted) {
+        for (final Task task : dirtyTasks) {
+            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
             // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            final Collection<TopicPartition> corruptedPartitions = 
entry.getValue();
-            task.markChangelogAsCorrupted(corruptedPartitions);
+            if (markAsCorrupted) {

Review comment:
       Not sure if I understand why we pulled this into this method? Not saying 
it's wrong, but I don't really understand right now. Can you elaborte?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,40 +155,48 @@ void handleRebalanceComplete() {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commit(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet())
+            );
+        } catch (final TaskCorruptedException e) {

Review comment:
       I think we could also catch `TimeoutException` here and just mark tasks 
as corrupted if we cannot commit them? (We would need to disable the 
`TImeoutException` handling within `commit()` for this case...)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<TaskId> dirtyTaskIds = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, just handle the cleanup for 
those corrupted tasks right here
+            dirtyTaskIds.addAll(e.corruptedTasks());
+            closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException we can just close dirty and revive 
without wiping the state
+            closeDirtyAndRevive(tasks.activeTasks(), false);
+            dirtyTaskIds.addAll(tasks.activeTaskIds());
+
+            try {
+                tasks.activeTasks().forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e));
+            } catch (final TimeoutException fatalTimeoutException) {
+                firstException.compareAndSet(null, fatalTimeoutException);
+            }
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
             firstException.compareAndSet(null, e);
         }
 
-        // only try to complete post-commit if committing succeeded;
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just
-        // proceed normally, if it is going to be closed we would checkpoint 
by then
-        if (firstException.get() == null) {
+        // only try to complete post-commit if committing succeeded, or if we 
hit a TaskCorruptedException then we

Review comment:
       If we address the comment from above with regard to `TimeoutException` 
this comment would need an update.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<TaskId> dirtyTaskIds = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, just handle the cleanup for 
those corrupted tasks right here
+            dirtyTaskIds.addAll(e.corruptedTasks());
+            closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException we can just close dirty and revive 
without wiping the state
+            closeDirtyAndRevive(tasks.activeTasks(), false);
+            dirtyTaskIds.addAll(tasks.activeTaskIds());
+
+            try {
+                tasks.activeTasks().forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e));

Review comment:
       As we handle revocation and thus don't own this task any longer, it 
might be ok to skip this step? What do we gain if we kill the thread?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<TaskId> dirtyTaskIds = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, just handle the cleanup for 
those corrupted tasks right here
+            dirtyTaskIds.addAll(e.corruptedTasks());
+            closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException we can just close dirty and revive 
without wiping the state
+            closeDirtyAndRevive(tasks.activeTasks(), false);
+            dirtyTaskIds.addAll(tasks.activeTaskIds());
+
+            try {
+                tasks.activeTasks().forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e));
+            } catch (final TimeoutException fatalTimeoutException) {
+                firstException.compareAndSet(null, fatalTimeoutException);
+            }
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
             firstException.compareAndSet(null, e);
         }
 
-        // only try to complete post-commit if committing succeeded;
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just
-        // proceed normally, if it is going to be closed we would checkpoint 
by then
-        if (firstException.get() == null) {
+        // only try to complete post-commit if committing succeeded, or if we 
hit a TaskCorruptedException then we
+        // can still checkpoint the uncorrupted tasks (if any)
+        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then
+        if (firstException.get() == null || !dirtyTaskIds.isEmpty()) {

Review comment:
       As we only call `postCommit` for non-dirty task anyway, do we need to 
check `firstException.get() == null` ? Seem we can remove it -- as we have the 
inner `f (!dirtyTaskIds.contains(task.id())) {` check anyway, it seems we can 
remove this condition completely and just execute the `for` loop always?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to