guozhangwang commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r943076070


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
         return allRunning;
     }
 
+    private void addTaskstoStateUpdater() {
+        for (final Task task : tasks.drainPendingTaskToInit()) {
+            task.initializeIfNeeded();
+            stateUpdater.add(task);
+        }
+    }
+
+    private void handleRemovedTasksFromStateUpdater() {
+        final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
+
+        for (final Task task : stateUpdater.drainRemovedTasks()) {
+            final TaskId taskId = task.id();
+            Set<TopicPartition> inputPartitions;
+            if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
+                try {
+                    final Task newTask = task.isActive() ?
+                        convertActiveToStandby((StreamTask) task, 
inputPartitions) :
+                        convertStandbyToActive((StandbyTask) task, 
inputPartitions);
+                    newTask.initializeIfNeeded();
+                    stateUpdater.add(newTask);
+                } catch (final RuntimeException e) {
+                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. " +
+                        "Attempting to close remaining tasks before 
re-throwing:", taskId);
+                    log.error(uncleanMessage, e);
+
+                    if (task.state() != State.CLOSED) {
+                        tasksToCloseDirty.add(task);
+                    }
+
+                    taskExceptions.putIfAbsent(taskId, e);
+                }
+            } else if (tasks.removePendingTaskToClose(task.id())) {
+                try {
+                    task.closeClean();
+                    if (task.isActive()) {
+                        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
+                    }
+                } catch (final RuntimeException e) {
+                    final String uncleanMessage = String.format("Failed to 
close task %s cleanly. " +
+                        "Attempting to close remaining tasks before 
re-throwing:", task.id());
+                    log.error(uncleanMessage, e);
+
+                    if (task.state() != State.CLOSED) {
+                        tasksToCloseDirty.add(task);
+                    }
+
+                    taskExceptions.putIfAbsent(task.id(), e);
+                }
+            } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
+                task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+                stateUpdater.add(task);

Review Comment:
   Found this issue when adding unit test: after update the input partitions, 
need to add it back to state updater.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
         return allRunning;
     }
 
+    private void addTaskstoStateUpdater() {
+        for (final Task task : tasks.drainPendingTaskToInit()) {
+            task.initializeIfNeeded();
+            stateUpdater.add(task);
+        }
+    }
+
+    private void handleRemovedTasksFromStateUpdater() {
+        final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
+
+        for (final Task task : stateUpdater.drainRemovedTasks()) {
+            final TaskId taskId = task.id();
+            Set<TopicPartition> inputPartitions;
+            if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
+                try {
+                    final Task newTask = task.isActive() ?
+                        convertActiveToStandby((StreamTask) task, 
inputPartitions) :
+                        convertStandbyToActive((StandbyTask) task, 
inputPartitions);
+                    newTask.initializeIfNeeded();
+                    stateUpdater.add(newTask);
+                } catch (final RuntimeException e) {
+                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. " +
+                        "Attempting to close remaining tasks before 
re-throwing:", taskId);
+                    log.error(uncleanMessage, e);
+
+                    if (task.state() != State.CLOSED) {
+                        tasksToCloseDirty.add(task);
+                    }
+
+                    taskExceptions.putIfAbsent(taskId, e);
+                }
+            } else if (tasks.removePendingTaskToClose(task.id())) {
+                try {
+                    task.closeClean();

Review Comment:
   Should not remove from the `tasks` since it's not in it, hence we could not 
call `closeClean`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,9 +43,7 @@
 class Tasks {
     private final Logger log;
 
-    // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
-    // TODO: change type to `StandbyTask`
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();

Review Comment:
   I tried to convert the types to StreamTask/StandbyTask, but realized that 
the blocker from doing so is because we have the stub class `StateMachineTask` 
in other unit tests which are not either. I then looked into if there's a 
better way to remove that stub class and instead just use a mock 
StreamTask/StandbyTask but that's actually not very convienent. So I figured 
unless we'd eventually remove `StateMachineTask` we'd have to keep as is. 
   
   In a second thought, I think we'd probably want to remove it eventually, so 
I'd add those TODOs back.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1076,59 +1135,51 @@ private Collection<Task> tryCloseCleanActiveTasks(final 
Collection<Task> activeT
         } else {
             try {
                 
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : activeTaskIterable()) {
-                    try {
-                        task.postCommit(true);
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        maybeWrapAndSetFirstException(firstException, e, 
task.id());
-                        tasksToCloseDirty.add(task);
-                        tasksToCloseClean.remove(task);
-                    }
-                }
-            } catch (final TimeoutException timeoutException) {
-                firstException.compareAndSet(null, timeoutException);
-
-                tasksToCloseClean.removeAll(tasksToCommit);
-                tasksToCloseDirty.addAll(tasksToCommit);
-            } catch (final TaskCorruptedException taskCorruptedException) {
-                firstException.compareAndSet(null, taskCorruptedException);
-
-                final Set<TaskId> corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
-                final Set<Task> corruptedTasks = tasksToCommit
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks " + 
consumedOffsetsAndMetadataPerTask.keySet(), e);
+                // TODO: should record the task ids when handling this 
exception
+                maybeSetFirstException(false, e, firstException);
+
+                if (e instanceof TaskCorruptedException) {
+                    final TaskCorruptedException taskCorruptedException = 
(TaskCorruptedException) e;
+                    final Set<TaskId> corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
+                    final Set<Task> corruptedTasks = tasksToCommit
                         .stream()
                         .filter(task -> corruptedTaskIds.contains(task.id()))
                         .collect(Collectors.toSet());
+                    tasksToCloseClean.removeAll(corruptedTasks);
+                    tasksToCloseDirty.addAll(corruptedTasks);
+                } else {
+                    // If the commit fails, everyone who participated in it 
must be closed dirty
+                    tasksToCloseClean.removeAll(tasksToCommit);
+                    tasksToCloseDirty.addAll(tasksToCommit);
+                }
+            }
 
-                tasksToCloseClean.removeAll(corruptedTasks);
-                tasksToCloseDirty.addAll(corruptedTasks);
-            } catch (final RuntimeException e) {
-                log.error("Exception caught while committing tasks during 
shutdown", e);
-                firstException.compareAndSet(null, e);
-
-                // If the commit fails, everyone who participated in it must 
be closed dirty
-                tasksToCloseClean.removeAll(tasksToCommit);
-                tasksToCloseDirty.addAll(tasksToCommit);
+            for (final Task task : activeTaskIterable()) {
+                try {
+                    task.postCommit(true);
+                } catch (final RuntimeException e) {
+                    log.error("Exception caught while post-committing task " + 
task.id(), e);
+                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    tasksToCloseDirty.add(task);
+                    tasksToCloseClean.remove(task);
+                }
             }
         }
 
         for (final Task task : tasksToCloseClean) {
             try {
                 task.suspend();
-                final RuntimeException exception = 
completeTaskCloseClean(task);
-                if (exception != null) {
-                    firstException.compareAndSet(null, exception);
-                }
-            } catch (final StreamsException e) {
-                log.error("Exception caught while clean-closing task " + 
task.id(), e);
-                e.setTaskId(task.id());
-                firstException.compareAndSet(null, e);
-                tasksToCloseDirty.add(task);
+                closeTaskClean(task);

Review Comment:
   This is the minor cleanup for exception wrapping / handling.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -298,13 +298,12 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             logPrefix
         );
 
-        final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
         final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
         final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
 
-        tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+        tasks.purgePendingTasksToCreate(activeTasks.keySet(), 
standbyTasks.keySet());

Review Comment:
   You're right! Will do so.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java:
##########
@@ -71,6 +72,8 @@ public class ActiveTaskCreatorTest {
     private StateDirectory stateDirectory;
     @Mock(type = MockType.NICE)
     private ChangelogReader changeLogReader;
+    @Mock(type = MockType.NICE)
+    private ProcessorStateManager stateManager;

Review Comment:
   Sorry, forget to remove it; it's for another work to address comments for 
the test coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to