lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1111950411


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   Sorry, almost missed this comment. I think this would work, but it would 
mean cycling through the list of all tasks in every single iteration of main 
state updater loop. I thought the pause code was only run after `commitMs` to 
reduce this overhead, so I thought it makes sense. It's different for resume, 
because we get a resume signal from the stream thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to