guozhangwang commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954204311


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -565,6 +583,303 @@ public void shouldHandleRemovedTasksFromStateUpdater() {
     }
 
     @Test
+    public void shouldTransitRestoredTaskToRunning() {

Review Comment:
   Nice mocks, love them!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, 
inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, 
inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before 
re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, 
taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to 
close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before 
re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   We used to wrap the logic for collecting exceptions and updating close-dirty 
tasks in a single call, but I found it a bit clumsy and hard to reason for 
different exception handling (at least for now --- I hope we can get a simpler 
exception handling in the refactoring). So I pealed it off, and so now the 
existing `closeTaskClean` only does the closing part:
   
   ```
   private void closeTaskClean(final Task task) {
           task.closeClean();
           tasks.removeTask(task);
           if (task.isActive()) {
               activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
           }
       }
   ```
   
   But the reason I could not use that function and catch-exception here the 
tasks to close-clean are already not in the `tasks` in our refactoring.
   
   Thinking about this a bit more, I still feel that we should do the exception 
catching outside the core logic of closing tasks, but we can still try to 
remove some duplicated code here e.g. we `try-catch` the whole if-else block, 
than doing that per branch.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> 
getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   nit: `hasActiveTasks`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map<TopicPartition, Lo
         return offsetSum;
     }
 
-    private void closeTaskDirty(final Task task) {
+    private void closeTaskDirty(final Task task, final boolean 
removeFromTasksRegistry) {

Review Comment:
   I have been also thinking about the taskRegistry / tasks here regarding when 
to remove from it. Since in the new model, this class only bookkeep running 
active tasks, while standby tasks and restoring active tasks are in the state 
updater, and the task-manager would need to get a union of them to return all 
managed tasks. So when we close a task:
   
   1) If the task is a running active task, then we need to remove it from the 
`tasks` also.
   2) If the task is retrieved from the state updater, then we do not need to 
remove it from the `tasks`.
   
   That means, we also need to consider that for close clean (see my other 
comment above, today we did it by using two `closeTaskClean`, one encapsulating 
the exception captures and do not remove from tasks, used for 2), the other not 
encapsulating the exception captures and do remove from tasks, used for 1)).
   
   I think we should just clear it a bit by just having a single 
`closeTaskClean/Dirty` which does not try to remove from the tasks inside (and 
also I suggest we do exception capturing at the caller, not inside, but that's 
open for discussions :), and just let the caller to decide whether to remove 
from `tasks` depending on whether the function call is for 1) or 2) above.
   
   Also, I feel that once we complete this, then tasks would contain much less 
meaningful fields, and we potentially could dissolve the `tasks` and just keep 
it as part of the TaskManager as `runningActiveTasks` then. But that's for the 
future discussion.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> 
getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   Also nit: could we just do `!getActiveTasks().isEmpty()` in the interface as 
a default func impl, so that if we want to mock it, we do not need to impl it 
again?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -673,44 +772,11 @@ private void handleRemovedTasksFromStateUpdater() {
         final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : stateUpdater.drainRemovedTasks()) {
-            final TaskId taskId = task.id();
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
-                try {
-                    final Task newTask = task.isActive() ?
-                        convertActiveToStandby((StreamTask) task, 
inputPartitions) :
-                        convertStandbyToActive((StandbyTask) task, 
inputPartitions);
-                    newTask.initializeIfNeeded();
-                    stateUpdater.add(newTask);
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. " +
-                        "Attempting to handle remaining tasks before 
re-throwing:", taskId);
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(taskId, e);
-                }
+                recycleTask(task, inputPartitions, tasksToCloseDirty, 
taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
-                try {
-                    task.suspend();
-                    task.closeClean();
-                    if (task.isActive()) {
-                        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format("Failed to 
close task %s cleanly. " +
-                        "Attempting to handle remaining tasks before 
re-throwing:", task.id());
-                    log.error(uncleanMessage, e);
-
-                    if (task.state() != State.CLOSED) {
-                        tasksToCloseDirty.add(task);
-                    }
-
-                    taskExceptions.putIfAbsent(task.id(), e);
-                }
+                closeTaskClean(task, tasksToCloseDirty, taskExceptions);

Review Comment:
   And to further remove duplicated logic, I think we can consolidate the 
`handleRemovedTasksFromStateUpdater` and `handleRestoredTasks` into e.g. a 
`tryHandleTasksFromStateUpdater` which checks if the task should be 1) 
recycled, 2) closed-clean / dirty, 3) update input partitions (and then give 
back to state updater). If the task falls on any of these conditions, return 
true otherwise false.
   
   Then in `handleRemovedTasksFromStateUpdater`:
   
   ```
   if (!tryHandleTasksFromStateUpdater(..)) {
       throw new IllegalStateException("this should not happen");
   }
   ```
   
   And in `handleRestoredTasksFromStateUpdater`:
   
   ```
   if (!tryHandleTasksFromStateUpdater(..)) {
       transitRestoredTaskToRunning(task);
   }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to