eduwercamacaro commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1939718078
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId,
CompletableFuture<StateUpdater.
final Set<Task>
tasksToCloseCleanFromStateUpdater,
final Set<Task>
tasksToCloseDirtyFromStateUpdater) {
futures.entrySet().stream()
- .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
- .filter(Objects::nonNull)
- .forEach(removedTaskResult -> {
- if (removedTaskResult.exception().isPresent()) {
-
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
- } else {
-
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
- }
- });
+ .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+ .filter(Objects::nonNull)
+ .forEach(removedTaskResult -> {
+ if (removedTaskResult.exception().isPresent()) {
+ final RuntimeException runtimeException =
removedTaskResult.exception().get();
+ if (runtimeException instanceof
TaskCorruptedException) {
Review Comment:
@cadonna Thanks for these inputs. I believe we don't need to implement this
exception handling in the `TaskManager.handleExceptionsFromStateUpdater` method
because it is already being handled at StreamThread level. StateUpdater's
exceptions get propagated to the StreamThread's exception handling and then
decide whether to close the task clean or dirty. That's what I understand from
the code, but maybe I'm wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]