ableegoldman commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1926229352


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   I agree with Matthias that we should be a bit more conservative here, though 
I don't think we need to whitelist every single exception/root-cause and can be 
rather generic about what gets a clean close.
   
   For example, let's say we clean-close all StreamsException and 
InterruptedException. And let's also explicitly dirty-close the 
TaskCorruptedException just so that if we change things in the future, we know 
that this exception has to do the dirty close (we can log something for this 
case too)
   
   Practically speaking this is pretty close to just clean-closing by default, 
since almost everything gets converted into a StreamsException. 
   
   The one case I'm still waffling on is IllegalStateException. Just paranoid 
that perhaps there could be some weird new (or existing) issue where data is 
like actually corrupted on disk, or something is otherwise so wrong that we 
have to throw an IllegalStateException. I'm leaning towards keeping 
IllegalStateException as close dirty, but could be convinced otherwise
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to