lucasbru commented on code in PR #15882: URL: https://github.com/apache/kafka/pull/15882#discussion_r1593738288
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task, } private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) { + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Map<Task, Set<TopicPartition>> tasksToRecycle, + final Set<Task> tasksToCloseCleanFromStateUpdater, + final Set<Task> tasksToCloseDirtyFromStateUpdater, + final Map<TaskId, RuntimeException> failedTasks) { + final Map<TaskId, Set<TopicPartition>> newInputPartitions = new HashMap<>(); + final Map<TaskId, Set<TopicPartition>> standbyInputPartitions = new HashMap<>(); + final Map<TaskId, Set<TopicPartition>> activeInputPartitions = new HashMap<>(); + final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futuresForUpdatingInputPartitions = new LinkedHashMap<>(); + final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futuresForActiveTasksToRecycle = new LinkedHashMap<>(); + final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futuresForStandbyTasksToRecycle = new LinkedHashMap<>(); + final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futuresForTasksToClose = new LinkedHashMap<>(); for (final Task task : stateUpdater.getTasks()) { final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { - final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId); - if (task.isActive() && !task.inputPartitions().equals(inputPartitions)) { - if (tasks.removePendingTaskToCloseClean(taskId)) { - tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, inputPartitions); - } else { - tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); - stateUpdater.remove(taskId); - } - } else if (task.isActive()) { - if (tasks.removePendingActiveTaskToSuspend(taskId)) { - log.info( - "We were planning on suspending a task {} because it was revoked " + - "The task got reassigned to this thread, so we cancel suspending " + - "of the task, but add it back to the state updater, since we do not know " + - "if it is fully restored yet.", - taskId); - tasks.addPendingTaskToAddBack(taskId); - } - if (tasks.removePendingTaskToCloseClean(taskId)) { - log.info( - "We were planning on closing task {} because we lost one of its partitions." + - "The task got reassigned to this thread, so cancel closing of the task, but add it back to the " + - "state updater, since we may have to catch up on the changelog.", - taskId); - tasks.addPendingTaskToAddBack(taskId); + if (task.isActive()) { + if (!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) { + final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId); + futuresForUpdatingInputPartitions.put(taskId, future); + newInputPartitions.put(taskId, activeTasksToCreate.get(taskId)); } } else { - removeTaskToRecycleFromStateUpdater(taskId, inputPartitions); + final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId); + futuresForStandbyTasksToRecycle.put(taskId, future); + activeInputPartitions.put(taskId, activeTasksToCreate.get(taskId)); } activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { if (task.isActive()) { - removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId)); - } else { - if (tasks.removePendingTaskToRecycle(taskId) != null) { - log.info( - "We were planning on recycling standby task {} to an active task." + - "The task got reassigned to this thread as a standby task, so cancel recycling of the task, " + - "but add it back to the state updater, since we may have to catch up on the changelog.", - taskId); - tasks.addPendingTaskToAddBack(taskId); - } + final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId); + futuresForActiveTasksToRecycle.put(taskId, future); + standbyInputPartitions.put(taskId, standbyTasksToCreate.get(taskId)); } standbyTasksToCreate.remove(taskId); } else { - removeUnusedTaskFromStateUpdater(taskId); - } - } + final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId); + futuresForTasksToClose.put(taskId, future); + } + } + updateInputPartitions(futuresForUpdatingInputPartitions, newInputPartitions, failedTasks); + addToActiveTasksToRecycle(futuresForActiveTasksToRecycle, standbyInputPartitions, tasksToRecycle, failedTasks); + addToStandbyTasksToRecycle(futuresForStandbyTasksToRecycle, activeInputPartitions, tasksToRecycle, failedTasks); + addToTasksToClose(futuresForTasksToClose, tasksToCloseCleanFromStateUpdater, tasksToCloseDirtyFromStateUpdater); + } + + private void updateInputPartitions(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures, + final Map<TaskId, Set<TopicPartition>> newInputPartitions, + final Map<TaskId, RuntimeException> failedTasks) { + iterateAndActOnRemovedTask( Review Comment: Is this whole `iterateAndAct` an artifact of the history of the code? It's quite complicated, and I'm not sure if it needs to be this way. The "act" part is just regular code that doesn't have to be passed around as lambdas at all. If you'd move the handling of `InterruptException` and `ExecutionException` into `waitForFuture` (which would make sense IMO), couldn't you just write ``` futures.entrySet().stream().map(entry -> waitForFuture(entry.getKey(), entry.getValue())).forEach(removedTaskResult -> ...) ``` and avoid this custom-built iterate machinery? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -365,17 +365,47 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, // 1. for tasks that are already owned, just update input partitions / resume and skip re-creating them // 2. for tasks that have changed active/standby status, just recycle and skip re-creating them // 3. otherwise, close them since they are no longer owned + final Map<TaskId, RuntimeException> failedTasks = new LinkedHashMap<>(); if (stateUpdater == null) { handleTasksWithoutStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); } else { - handleTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); + final Map<Task, Set<TopicPartition>> tasksToRecycleFromStateUpdater = new HashMap<>(); Review Comment: I'd put this block inside `handleTasksWithStateUpdater` to separate it from the non-state updater code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org