cadonna commented on code in PR #12312: URL: https://github.com/apache/kafka/pull/12312#discussion_r904959285
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask task) { private final Time time; private final ChangelogReader changelogReader; private final Consumer<Set<TopicPartition>> offsetResetter; + private final boolean manualStart; private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>(); private final Lock tasksAndActionsLock = new ReentrantLock(); private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition(); private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>(); private final Lock restoredActiveTasksLock = new ReentrantLock(); private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); - private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>(); - private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); - private CountDownLatch shutdownGate; + private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedList<>(); Review Comment: My original idea was to block all queues so that tasks are not moved from one data structure to the other when the state updater collects the tasks during the calls to `get*Tasks()`. Thinking about it again, I am not sure this is needed. Much more important is to ensure that during the moving of a task the task is first added to the destination and then removed from the source so that the task is in at least one data structure at all times. Otherwise, we might miss the task during task collection due to race conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org