mjsax commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602797242
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set<TaskId> dirtyTaskIds = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, just handle the cleanup for those corrupted tasks right here + dirtyTaskIds.addAll(e.corruptedTasks()); + closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException we can just close dirty and revive without wiping the state Review comment: I guess, the underlying reasons is, that a TimeoutException could only happen for ALOS, while for EOS any timeout would be converted into TaskCorrupted? Might be worth to clarify in the comment. (Maybe also add a comment to the TaskCorrupted catch block above: `` // EOS case`` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1027,6 +1064,10 @@ int maybeCommitActiveTasksPerUserRequested() { } } + /** + * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS) Review comment: I think we throw `Timeout` if `commitSync` hits a timeout (ie, just re-throw) independently of `task.timeout.ms` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,48 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commit(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()) + ); + } catch (final TaskCorruptedException e) { + log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " + + "tasks to clean and revive: {}", e.corruptedTasks()); + corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks())); + } - closeAndRevive(corruptedActiveTasks); + closeDirtyAndRevive(corruptedActiveTasks, true); } - private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWithChangelogs) { - for (final Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) { - final Task task = entry.getKey(); + private void closeDirtyAndRevive(final Collection<Task> dirtyTasks, final boolean markAsCorrupted) { + for (final Task task : dirtyTasks) { + final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); // mark corrupted partitions to not be checkpointed, and then close the task as dirty - final Collection<TopicPartition> corruptedPartitions = entry.getValue(); - task.markChangelogAsCorrupted(corruptedPartitions); + if (markAsCorrupted) { Review comment: Not sure if I understand why we pulled this into this method? Not saying it's wrong, but I don't really understand right now. Can you elaborte? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -155,40 +155,48 @@ void handleRebalanceComplete() { * @throws TaskMigratedException */ void handleCorruption(final Set<TaskId> corruptedTasks) { - final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>(); - final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>(); + final Set<Task> corruptedActiveTasks = new HashSet<>(); + final Set<Task> corruptedStandbyTasks = new HashSet<>(); for (final TaskId taskId : corruptedTasks) { final Task task = tasks.task(taskId); if (task.isActive()) { - corruptedActiveTasks.put(task, task.changelogPartitions()); + corruptedActiveTasks.add(task); } else { - corruptedStandbyTasks.put(task, task.changelogPartitions()); + corruptedStandbyTasks.add(task); } } // Make sure to clean up any corrupted standby tasks in their entirety before committing // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks - closeAndRevive(corruptedStandbyTasks); - - commit(tasks() - .values() - .stream() - .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) - .filter(t -> !corruptedTasks.contains(t.id())) - .collect(Collectors.toSet()) - ); + closeDirtyAndRevive(corruptedStandbyTasks, true); + + // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort + try { + commit(tasks() + .values() + .stream() + .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) + .filter(t -> !corruptedTasks.contains(t.id())) + .collect(Collectors.toSet()) + ); + } catch (final TaskCorruptedException e) { Review comment: I think we could also catch `TimeoutException` here and just mark tasks as corrupted if we cannot commit them? (We would need to disable the `TImeoutException` handling within `commit()` for this case...) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set<TaskId> dirtyTaskIds = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, just handle the cleanup for those corrupted tasks right here + dirtyTaskIds.addAll(e.corruptedTasks()); + closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException we can just close dirty and revive without wiping the state + closeDirtyAndRevive(tasks.activeTasks(), false); + dirtyTaskIds.addAll(tasks.activeTaskIds()); + + try { + tasks.activeTasks().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e)); + } catch (final TimeoutException fatalTimeoutException) { + firstException.compareAndSet(null, fatalTimeoutException); + } } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); firstException.compareAndSet(null, e); } - // only try to complete post-commit if committing succeeded; - // we enforce checkpointing upon suspending a task: if it is resumed later we just - // proceed normally, if it is going to be closed we would checkpoint by then - if (firstException.get() == null) { + // only try to complete post-commit if committing succeeded, or if we hit a TaskCorruptedException then we Review comment: If we address the comment from above with regard to `TimeoutException` this comment would need an update. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set<TaskId> dirtyTaskIds = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, just handle the cleanup for those corrupted tasks right here + dirtyTaskIds.addAll(e.corruptedTasks()); + closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException we can just close dirty and revive without wiping the state + closeDirtyAndRevive(tasks.activeTasks(), false); + dirtyTaskIds.addAll(tasks.activeTaskIds()); + + try { + tasks.activeTasks().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e)); Review comment: As we handle revocation and thus don't own this task any longer, it might be ok to skip this step? What do we gain if we kill the thread? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set<TaskId> dirtyTaskIds = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, just handle the cleanup for those corrupted tasks right here + dirtyTaskIds.addAll(e.corruptedTasks()); + closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException we can just close dirty and revive without wiping the state + closeDirtyAndRevive(tasks.activeTasks(), false); + dirtyTaskIds.addAll(tasks.activeTaskIds()); + + try { + tasks.activeTasks().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e)); + } catch (final TimeoutException fatalTimeoutException) { + firstException.compareAndSet(null, fatalTimeoutException); + } } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); firstException.compareAndSet(null, e); } - // only try to complete post-commit if committing succeeded; - // we enforce checkpointing upon suspending a task: if it is resumed later we just - // proceed normally, if it is going to be closed we would checkpoint by then - if (firstException.get() == null) { + // only try to complete post-commit if committing succeeded, or if we hit a TaskCorruptedException then we + // can still checkpoint the uncorrupted tasks (if any) + // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is + // going to be closed we would checkpoint by then + if (firstException.get() == null || !dirtyTaskIds.isEmpty()) { Review comment: As we only call `postCommit` for non-dirty task anyway, do we need to check `firstException.get() == null` ? Seem we can remove it -- as we have the inner `f (!dirtyTaskIds.contains(task.id())) {` check anyway, it seems we can remove this condition completely and just execute the `for` loop always? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org