guozhangwang commented on code in PR #12554: URL: https://github.com/apache/kafka/pull/12554#discussion_r954203463
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo return offsetSum; } - private void closeTaskDirty(final Task task) { + private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) { Review Comment: I have been also thinking about the taskRegistry / tasks here regarding when to remove from it. Since in the new model, this class only bookkeep running active tasks, while standby tasks and restoring active tasks are in the state updater, and the task-manager would need to get a union of them to return all managed tasks. So when we close a task: 1) If the task is a running active task, then we need to remove it from the `tasks` also. 2) If the task is retrieved from the state updater, then we do not need to remove it from the `tasks`. That means, we also need to consider that for close clean (see my other comment above, today we did it by using two `closeTaskClean`, one encapsulating the exception captures and do not remove from tasks, used for 2), the other not encapsulating the exception captures and do remove from tasks, used for 1)). I think we should just clear it a bit by just having a single `closeTaskClean/Dirty` which does not try to remove from the tasks inside (and also I suggest we do exception capturing at the caller, not inside, but that's open for discussions :), and just let the caller to decide whether to remove from `tasks` depending on whether the function call is for 1) or 2) above. Also, I feel that once we complete this, then tasks would contain much less meaningful fields, and we potentially could dissolve `tasks` and just keep all its bookkeepings as part of the TaskManager, e.g. as `runningActiveTasks` and add functions for adding removing those bookkeeping tasks then for unit testing purposes. But that's for the future discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org