cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Wouldn't it be better to store paused topologies within the state updater 
thread? 
   
   Consider the following scenario: 
   1. The state updater managed task A from topology X. 
   2. Topology X is paused.
   3. Task A is revoked from and task B from topology X is assigned.
   When the topology is paused, we would pass the topology name through an 
input queue event (like add or remove) to the state updater thread. Once the 
state updater thread processes the input queue event with the topology name to 
pause, it stores the name of the paused topology and pauses all tasks (i.e., 
task A) of the paused topology. Once task B is assigned and added to the state 
updater, the paused topologies are consulted and since topology X is paused 
task B will be paused directly. When a topology is resumed, the corresponding 
tasks are resumed  and the name of the resumed topology is removed from the 
state updater thread.
   If we expose the names of the paused topology from the state updater thread 
to the default state updater, we can check in `pause(topology)` if the state 
updater thread already knows about the paused topology and avoid creating an 
event in the input queue for already-known paused topologies. 
   All this would avoid to iterate over the tasks each time `checkStateUpdater` 
is called. We would only do a check of the paused topologies when a task is 
added, and loop over the task if a topology was just paused.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to