cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r904994088


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -295,32 +329,53 @@ private void addTaskToRestoredTasks(final StreamTask 
task) {
     private final Time time;
     private final ChangelogReader changelogReader;
     private final Consumer<Set<TopicPartition>> offsetResetter;
+    private final boolean manualStart;
     private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
     private final Lock tasksAndActionsLock = new ReentrantLock();
     private final Condition tasksAndActionsCondition = 
tasksAndActionsLock.newCondition();
     private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
     private final Lock restoredActiveTasksLock = new ReentrantLock();
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
-    private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
-    private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
-    private CountDownLatch shutdownGate;
+    private final Queue<ExceptionAndTasks> exceptionsAndFailedTasks = new 
LinkedList<>();
+    private final Lock exceptionsAndFailedTasksLock = new ReentrantLock();
+    private final Queue<Task> removedTasks = new LinkedList<>();
+    private final Lock removedTasksLock = new ReentrantLock();
 
     private StateUpdaterThread stateUpdaterThread = null;
+    private CountDownLatch shutdownGate;
 
     public DefaultStateUpdater(final ChangelogReader changelogReader,
                                final Consumer<Set<TopicPartition>> 
offsetResetter,
                                final Time time) {
+        this(changelogReader, offsetResetter, time, false);
+    }
+
+    public DefaultStateUpdater(final ChangelogReader changelogReader,
+                               final Consumer<Set<TopicPartition>> 
offsetResetter,
+                               final Time time,
+                               final boolean manualStart) {
         this.changelogReader = changelogReader;
         this.offsetResetter = offsetResetter;
         this.time = time;
+        this.manualStart = manualStart;
+    }
+
+    public void start() {

Review Comment:
   Yes, I think that is simpler. I switched back and forth between calling 
`start()` and lazy start during the first `add()`. I think simpler is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to