cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r904959285


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask 
task) {
     private final Time time;
     private final ChangelogReader changelogReader;
     private final Consumer<Set<TopicPartition>> offsetResetter;
+    private final boolean manualStart;
     private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
     private final Lock tasksAndActionsLock = new ReentrantLock();
     private final Condition tasksAndActionsCondition = 
tasksAndActionsLock.newCondition();
     private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
     private final Lock restoredActiveTasksLock = new ReentrantLock();
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
-    private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
-    private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
-    private CountDownLatch shutdownGate;
+    private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new 
LinkedList<>();

Review Comment:
   My original idea was to block all queues so that tasks are not moved from 
one data structure to the other when the state updater collects the tasks 
during the calls to `get*Tasks()`. Thinking about it again, I am not sure this 
is needed. Much more important is to ensure that during the moving of a task 
the task is first added to the destination and then removed from the source so 
that the task is in at least one data structure at all times. Otherwise, we 
might miss the task during task collection due to race conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to