guozhangwang commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r1092598270
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ########## @@ -190,7 +190,7 @@ public void clearTaskTimeout() { @Override public boolean commitNeeded() { - throw new UnsupportedOperationException("This task is read-only"); + return task.commitNeeded(); Review Comment: Why do we need to change these two functions? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() { } private void addTask(final Task task) { + final TaskId taskId = task.id(); if (isStateless(task)) { addToRestoredTasks((StreamTask) task); - log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); + log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater"); + } else if (topologyMetadata.isPaused(taskId.topologyName())) { + pausedTasks.put(taskId, task); Review Comment: I'm wondering if this complexity is necessary, since we do not make strict ordering guarantees for paused topologies -- i.e. it's okay to still processing those tasks for a while after the `pause()` call is triggered. Is it really a correctness or concurrency issue? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() { Map<TaskId, Task> allTasks() { // not bothering with an unmodifiable map, since the tasks themselves are mutable, but // if any outside code modifies the map or the tasks, it would be a severe transgression. - return tasks.allTasksPerId(); + if (stateUpdater != null) { + final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x)); + ret.putAll(tasks.allTasksPerId()); Review Comment: I've changed the func name slightly in another PR, so if that PR is merged we need to do a slight rebase/conflict resolution, just FYI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org