ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602814084



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +519,60 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end
+        final Set<TaskId> corruptedTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, we should just handle the 
cleanup for those corrupted tasks right here
+            corruptedTasks.addAll(e.corruptedTasks());
+            final Map<Task, Collection<TopicPartition>> 
corruptedTasksWithChangelogs = new HashMap<>();
+            for (final TaskId taskId : corruptedTasks) {
+                final Task task = tasks.task(taskId);
+                task.markChangelogAsCorrupted(task.changelogPartitions());
+                corruptedTasksWithChangelogs.put(task, 
task.changelogPartitions());
+            }
+            closeAndRevive(corruptedTasksWithChangelogs);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
+
+            // TODO: KIP-572 need to handle TimeoutException, may be rethrown 
from committing offsets under ALOS

Review comment:
       > To me, task.timeout.ms applies mainly to the regular processing case. 
For task revocation/corruption it seem ok to ignore the timeout and just reset 
it
   
   Ok...I honestly didn't follow KIP-572 too closely but that's what I was 
thinking originally and then I got paranoid. But I managed the changes so I 
could just revert the last commit if that's how you responded. Sounds like 
that's the plan -- please give it one hopefully final pass once I do and we can 
finally wash our hands of this
   
   >Or should we only reset the timeout if the cleanup is successful?
   
   The cleanup (you mean `closeAndRevive` right?) shouldn't ever really fail, 
but yeah I guess we can just reset it after that returns




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to