ableegoldman commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1927963266
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId,
CompletableFuture<StateUpdater.
final Set<Task>
tasksToCloseCleanFromStateUpdater,
final Set<Task>
tasksToCloseDirtyFromStateUpdater) {
futures.entrySet().stream()
- .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
- .filter(Objects::nonNull)
- .forEach(removedTaskResult -> {
- if (removedTaskResult.exception().isPresent()) {
-
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
- } else {
-
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
- }
- });
+ .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+ .filter(Objects::nonNull)
+ .forEach(removedTaskResult -> {
+ if (removedTaskResult.exception().isPresent()) {
+ final RuntimeException runtimeException =
removedTaskResult.exception().get();
+ if (runtimeException instanceof
TaskCorruptedException) {
Review Comment:
Agree 100% -- I know "classifying & cleaning up exceptions" has been on our
TODO list forever because it's a huge goal, but I think it doesn't hurt to
start now, even if it's not perfect and we don't go through and classify/fix
everything that exists today. Especially in light of all the huge changes
coming (eg KIP-1071) it would be good to just introduce a
close-clean/close-dirty exception type and start using it going forward. Also
interested to hear from Bruno on this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]