ableegoldman commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1926229352
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId,
CompletableFuture<StateUpdater.
final Set<Task>
tasksToCloseCleanFromStateUpdater,
final Set<Task>
tasksToCloseDirtyFromStateUpdater) {
futures.entrySet().stream()
- .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
- .filter(Objects::nonNull)
- .forEach(removedTaskResult -> {
- if (removedTaskResult.exception().isPresent()) {
-
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
- } else {
-
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
- }
- });
+ .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+ .filter(Objects::nonNull)
+ .forEach(removedTaskResult -> {
+ if (removedTaskResult.exception().isPresent()) {
+ final RuntimeException runtimeException =
removedTaskResult.exception().get();
+ if (runtimeException instanceof
TaskCorruptedException) {
Review Comment:
I agree with Matthias that we should be a bit more conservative here, though
I don't think we need to whitelist every single exception/root-cause and can be
rather generic about what gets a clean close.
For example, let's say we clean-close all StreamsException and
InterruptedException. And let's also explicitly dirty-close the
TaskCorruptedException just so that if we change things in the future, we know
that this exception has to do the dirty close (we can log something for this
case too)
Practically speaking this is pretty close to just clean-closing by default,
since almost everything gets converted into a StreamsException.
The one case I'm still waffling on is IllegalStateException. Just paranoid
that perhaps there could be some weird new (or existing) issue where data is
like actually corrupted on disk, or something is otherwise so wrong that we
have to throw an IllegalStateException. I'm leaning towards keeping
IllegalStateException as close dirty, but could be convinced otherwise
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]