guozhangwang commented on code in PR #12554: URL: https://github.com/apache/kafka/pull/12554#discussion_r954204311
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -565,6 +583,303 @@ public void shouldHandleRemovedTasksFromStateUpdater() { } @Test + public void shouldTransitRestoredTaskToRunning() { Review Comment: Nice mocks, love them! ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() { final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); for (final Task task : stateUpdater.drainRemovedTasks()) { - final TaskId taskId = task.id(); Set<TopicPartition> inputPartitions; if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { - try { - final Task newTask = task.isActive() ? - convertActiveToStandby((StreamTask) task, inputPartitions) : - convertStandbyToActive((StandbyTask) task, inputPartitions); - newTask.initializeIfNeeded(); - stateUpdater.add(newTask); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " + - "Attempting to handle remaining tasks before re-throwing:", taskId); - log.error(uncleanMessage, e); - - if (task.state() != State.CLOSED) { - tasksToCloseDirty.add(task); - } - - taskExceptions.putIfAbsent(taskId, e); - } + recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseClean(task.id())) { - try { - task.suspend(); - task.closeClean(); - if (task.isActive()) { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to close task %s cleanly. " + - "Attempting to handle remaining tasks before re-throwing:", task.id()); - log.error(uncleanMessage, e); - - if (task.state() != State.CLOSED) { - tasksToCloseDirty.add(task); - } - - taskExceptions.putIfAbsent(task.id(), e); - } + closeTaskClean(task, tasksToCloseDirty, taskExceptions); Review Comment: We used to wrap the logic for collecting exceptions and updating close-dirty tasks in a single call, but I found it a bit clumsy and hard to reason for different exception handling (at least for now --- I hope we can get a simpler exception handling in the refactoring). So I pealed it off, and so now the existing `closeTaskClean` only does the closing part: ``` private void closeTaskClean(final Task task) { task.closeClean(); tasks.removeTask(task); if (task.isActive()) { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } } ``` But the reason I could not use that function and catch-exception here the tasks to close-clean are already not in the `tasks` in our refactoring. Thinking about this a bit more, I still feel that we should do the exception catching outside the core logic of closing tasks, but we can still try to remove some duplicated code here e.g. we `try-catch` the whole if-else block, than doing that per branch. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -565,6 +565,13 @@ public Set<Task> getTasks() { return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet())); } + @Override + public boolean restoresActiveTasks() { Review Comment: nit: `hasActiveTasks`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo return offsetSum; } - private void closeTaskDirty(final Task task) { + private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) { Review Comment: I have been also thinking about the taskRegistry / tasks here regarding when to remove from it. Since in the new model, this class only bookkeep running active tasks, while standby tasks and restoring active tasks are in the state updater, and the task-manager would need to get a union of them to return all managed tasks. So when we close a task: 1) If the task is a running active task, then we need to remove it from the `tasks` also. 2) If the task is retrieved from the state updater, then we do not need to remove it from the `tasks`. That means, we also need to consider that for close clean (see my other comment above, today we did it by using two `closeTaskClean`, one encapsulating the exception captures and do not remove from tasks, used for 2), the other not encapsulating the exception captures and do remove from tasks, used for 1)). I think we should just clear it a bit by just having a single `closeTaskClean/Dirty` which does not try to remove from the tasks inside (and also I suggest we do exception capturing at the caller, not inside, but that's open for discussions :), and just let the caller to decide whether to remove from `tasks` depending on whether the function call is for 1) or 2) above. Also, I feel that once we complete this, then tasks would contain much less meaningful fields, and we potentially could dissolve the `tasks` and just keep it as part of the TaskManager as `runningActiveTasks` then. But that's for the future discussion. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -565,6 +565,13 @@ public Set<Task> getTasks() { return executeWithQueuesLocked(() -> getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet())); } + @Override + public boolean restoresActiveTasks() { Review Comment: Also nit: could we just do `!getActiveTasks().isEmpty()` in the interface as a default func impl, so that if we want to mock it, we do not need to impl it again? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() { final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); for (final Task task : stateUpdater.drainRemovedTasks()) { - final TaskId taskId = task.id(); Set<TopicPartition> inputPartitions; if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { - try { - final Task newTask = task.isActive() ? - convertActiveToStandby((StreamTask) task, inputPartitions) : - convertStandbyToActive((StandbyTask) task, inputPartitions); - newTask.initializeIfNeeded(); - stateUpdater.add(newTask); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " + - "Attempting to handle remaining tasks before re-throwing:", taskId); - log.error(uncleanMessage, e); - - if (task.state() != State.CLOSED) { - tasksToCloseDirty.add(task); - } - - taskExceptions.putIfAbsent(taskId, e); - } + recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseClean(task.id())) { - try { - task.suspend(); - task.closeClean(); - if (task.isActive()) { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to close task %s cleanly. " + - "Attempting to handle remaining tasks before re-throwing:", task.id()); - log.error(uncleanMessage, e); - - if (task.state() != State.CLOSED) { - tasksToCloseDirty.add(task); - } - - taskExceptions.putIfAbsent(task.id(), e); - } + closeTaskClean(task, tasksToCloseDirty, taskExceptions); Review Comment: And to further remove duplicated logic, I think we can consolidate the `handleRemovedTasksFromStateUpdater` and `handleRestoredTasks` into e.g. a `tryHandleTasksFromStateUpdater` which checks if the task should be 1) recycled, 2) closed-clean / dirty, 3) update input partitions (and then give back to state updater). If the task falls on any of these conditions, return true otherwise false. Then in `handleRemovedTasksFromStateUpdater`: ``` if (!tryHandleTasksFromStateUpdater(..)) { throw new IllegalStateException("this should not happen"); } ``` And in `handleRestoredTasksFromStateUpdater`: ``` if (!tryHandleTasksFromStateUpdater(..)) { transitRestoredTaskToRunning(task); } ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org