mjsax commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602604314
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,41 +519,62 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end + final Set<TaskId> corruptedTasks = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); - firstException.compareAndSet(null, e); - } - // only try to complete post-commit if committing succeeded; - // we enforce checkpointing upon suspending a task: if it is resumed later we just - // proceed normally, if it is going to be closed we would checkpoint by then - if (firstException.get() == null) { - for (final Task task : revokedActiveTasks) { - try { - task.postCommit(true); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - firstException.compareAndSet(null, e); + // If we hit a TaskCorruptedException, we should just handle the cleanup for those corrupted tasks right here + if (e instanceof TaskCorruptedException) { + corruptedTasks.addAll(((TaskCorruptedException) e).corruptedTasks()); + final Map<Task, Collection<TopicPartition>> corruptedTasksWithChangelogs = new HashMap<>(); + for (final TaskId taskId : corruptedTasks) { + final Task task = tasks.task(taskId); + task.markChangelogAsCorrupted(task.changelogPartitions()); + corruptedTasksWithChangelogs.put(task, task.changelogPartitions()); } + closeAndRevive(corruptedTasksWithChangelogs); + } else { + // TODO: KIP-572 need to handle TimeoutException, may be rethrown from committing offsets under ALOS Review comment: I don't think we need to handle `TimeoutException` here? If a client throws a `Timeout` it should be handled within `commitOffsetsOrTransaction` -- and `commitOffsetsOrTransaction` should only re-throw a timeout after `task.timeout.ms` expired and for this case we should indeed let the thread die. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org