cadonna commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1943069285


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   Sorry, but I do not get it. The exceptions that `removedTaskResult` gives 
you are the same exceptions that you would get from 
`handleExceptionsFromStateUpdater()` if the failed tasks were not removed from 
the state updater. `handleExceptionsFromStateUpdater()` throws exceptions up 
and if they cannot be handled by the stream thread (i.e., 
`TaskCorruptedException`, `TaskMigratedException`, and 
`UnsupportedVersionException`, btw the last two are never thrown by the state 
updater) they are given to the Streams uncaught exception handler which would 
shut down the client, shut down the app, or replace the thread. In all three 
cases it would dirty close the task. Why would we not also close the task dirty 
when we remove a failed task from the state updater during a rebalance?
   Could you provide me with a concrete example that you encountered? Maybe I 
am just missing something.         



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to