cadonna commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1943069285
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId,
CompletableFuture<StateUpdater.
final Set<Task>
tasksToCloseCleanFromStateUpdater,
final Set<Task>
tasksToCloseDirtyFromStateUpdater) {
futures.entrySet().stream()
- .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
- .filter(Objects::nonNull)
- .forEach(removedTaskResult -> {
- if (removedTaskResult.exception().isPresent()) {
-
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
- } else {
-
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
- }
- });
+ .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+ .filter(Objects::nonNull)
+ .forEach(removedTaskResult -> {
+ if (removedTaskResult.exception().isPresent()) {
+ final RuntimeException runtimeException =
removedTaskResult.exception().get();
+ if (runtimeException instanceof
TaskCorruptedException) {
Review Comment:
Sorry, but I do not get it. The exceptions that `removedTaskResult` gives
you are the same exceptions that you would get from
`handleExceptionsFromStateUpdater()` if the failed tasks were not removed from
the state updater. `handleExceptionsFromStateUpdater()` throws exceptions up
and if they cannot be handled by the stream thread (i.e.,
`TaskCorruptedException`, `TaskMigratedException`, and
`UnsupportedVersionException`, btw the last two are never thrown by the state
updater) they are given to the Streams uncaught exception handler which would
shut down the client, shut down the app, or replace the thread. In all three
cases it would dirty close the task. Why would we not also close the task dirty
when we remove a failed task from the state updater during a rebalance?
Could you provide me with a concrete example that you encountered? Maybe I
am just missing something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]