cadonna commented on code in PR #12659: URL: https://github.com/apache/kafka/pull/12659#discussion_r979883060
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -500,23 +500,29 @@ public void remove(final TaskId taskId) { } } - @Override - public void pause(final TaskId taskId) { + public void pause(final TopologyMetadata topologyMetadata) { tasksAndActionsLock.lock(); try { - tasksAndActions.add(TaskAndAction.createPauseTask(taskId)); - tasksAndActionsCondition.signalAll(); + for (final Task task : getUpdatingTasks()) { + if (topologyMetadata.isPaused(task.id().topologyName())) { + tasksAndActions.add(TaskAndAction.createPauseTask(task.id())); + tasksAndActionsCondition.signalAll(); + } + } Review Comment: @guozhangwang I think, we misunderstood each other. Sorry if I was not clear enough. My proposal was to store the names of the paused topologies in the state updater thread, like: ``` public class DefaultStateUpdater implements StateUpdater { ... private class StateUpdaterThread extends Thread { ... private Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); private Set<String> getPausedTopologies() { return Collections.unmodifiableSet(pausedTopologies); } ``` The `StateUpdater` (and `DefaultStateUpdater`) would then have the following method: ``` public class DefaultStateUpdater implements StateUpdater { ... @Override public syncPausedTopologies(final Set<String> pausedTopologies) { final Set<String> stateUpdaterPausedTopologies = getPausedTopologies(); final Set<String> pausedTopologiesToAdd = pausedTopologies.removeAll(stateUpdaterPausedTopologies); final Set<String> pausedTopologiesToRemove = stateUpdaterPausedTopologies.removeAll(pausedTopologies); if (!pausedTopologiesToAdd.isEmpty()) { tasksAndActions.addPausedTopology(TaskAndAction.createAddPausedTopology(pausedTopologiesToAdd)); } if (!pausedTopologiesToRemove.isEmpty()) { tasksAndActions.removePausedTopology(TaskAndAction.createRemovePausedTopology(pausedTopologiesToRemove)); } } ``` In this way, we would not need to iterate over the tasks in the state updater in regular intervals but only if the set of paused topologies changed. When we add a task to the state updater, the state updater thread needs to check if the topology of the task is paused or not. ``` public class DefaultStateUpdater implements StateUpdater { ... private class StateUpdaterThread extends Thread { ... private void addTask(final Task task) { if (isStateless(task)) { addToRestoredTasks((StreamTask) task); log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); } else { if (pausedTopologies.contains(task.topologyName())) { // add to paused tasks } else { // add to updating tasks } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org