cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r979883060


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   @guozhangwang I think, we misunderstood each other. Sorry if I was not clear 
enough.
   My proposal was to store the names of the paused topologies in the state 
updater thread, like:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
   
           private Set<String> getPausedTopologies() {
               return Collections.unmodifiableSet(pausedTopologies);
           }
   ```
   
   The `StateUpdater` (and `DefaultStateUpdater`) would then have the following 
method:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       @Override
       public syncPausedTopologies(final Set<String> pausedTopologies) {
           final Set<String> stateUpdaterPausedTopologies = 
getPausedTopologies();
           final Set<String> pausedTopologiesToAdd = 
pausedTopologies.removeAll(stateUpdaterPausedTopologies);
           final Set<String> pausedTopologiesToRemove = 
stateUpdaterPausedTopologies.removeAll(pausedTopologies);
           if (!pausedTopologiesToAdd.isEmpty()) {  
               
tasksAndActions.addPausedTopology(TaskAndAction.createAddPausedTopology(pausedTopologiesToAdd));
           }
           if (!pausedTopologiesToRemove.isEmpty()) {  
               
tasksAndActions.removePausedTopology(TaskAndAction.createRemovePausedTopology(pausedTopologiesToRemove));
           }
       }
   ``` 
   
   In this way, we would not need to iterate over the tasks in the state 
updater in regular intervals but only if the set of paused topologies changed. 
   When we add a task to the state updater, the state updater thread needs to 
check if the topology of the task is paused or not.
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private void addTask(final Task task) {
                if (isStateless(task)) {
                   addToRestoredTasks((StreamTask) task);
                   log.info("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
               } else {
                   if (pausedTopologies.contains(task.topologyName())) {
                       // add to paused tasks
                   } else {
                      // add to updating tasks 
                   }
               }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to