cadonna commented on code in PR #12659: URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -500,23 +500,29 @@ public void remove(final TaskId taskId) { } } - @Override - public void pause(final TaskId taskId) { + public void pause(final TopologyMetadata topologyMetadata) { tasksAndActionsLock.lock(); try { - tasksAndActions.add(TaskAndAction.createPauseTask(taskId)); - tasksAndActionsCondition.signalAll(); + for (final Task task : getUpdatingTasks()) { + if (topologyMetadata.isPaused(task.id().topologyName())) { + tasksAndActions.add(TaskAndAction.createPauseTask(task.id())); + tasksAndActionsCondition.signalAll(); + } + } Review Comment: Wouldn't it be better to store paused topologies within the state updater thread? Consider the following scenario: 1. The state updater managed task A from topology X. 2. Topology X is paused. 3. Task A is revoked from and task B from topology X is assigned. When the topology is paused, we would pass the topology name through an input queue event (like add or remove) to the state updater thread. Once the state updater thread processes the input queue event with the topology name to pause, it stores the name of the paused topology and pauses all tasks (i.e., task A) of the paused topology. Once task B is assigned and added to the state updater, the paused topologies are consulted and since topology X is paused task B will be paused directly. When a topology is resumed, the corresponding tasks are resumed and the name of the resumed topology is removed from the state updater thread. If we expose the names of the paused topology from the state updater thread to the default state updater, we can check in `pause(topology)` if the state updater thread already knows about the paused topology and avoid creating an event in the input queue for already-known paused topologies. All this would avoid to iterate over the tasks each time `checkStateUpdater` is called. We would only do a check of the paused topologies when a task is added, and loop over the task if a topology was just paused. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org