guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1092598270


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   Why do we need to change these two functions?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   I'm wondering if this complexity is necessary, since we do not make strict 
ordering guarantees for paused topologies -- i.e. it's okay to still processing 
those tasks for a while after the `pause()` call is triggered. Is it really a 
correctness or concurrency issue?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());

Review Comment:
   I've changed the func name slightly in another PR, so if that PR is merged 
we need to do a slight rebase/conflict resolution, just FYI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to