guozhangwang commented on code in PR #12466: URL: https://github.com/apache/kafka/pull/12466#discussion_r943076070
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum return allRunning; } + private void addTaskstoStateUpdater() { + for (final Task task : tasks.drainPendingTaskToInit()) { + task.initializeIfNeeded(); + stateUpdater.add(task); + } + } + + private void handleRemovedTasksFromStateUpdater() { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); + final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); + + for (final Task task : stateUpdater.drainRemovedTasks()) { + final TaskId taskId = task.id(); + Set<TopicPartition> inputPartitions; + if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { + try { + final Task newTask = task.isActive() ? + convertActiveToStandby((StreamTask) task, inputPartitions) : + convertStandbyToActive((StandbyTask) task, inputPartitions); + newTask.initializeIfNeeded(); + stateUpdater.add(newTask); + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " + + "Attempting to close remaining tasks before re-throwing:", taskId); + log.error(uncleanMessage, e); + + if (task.state() != State.CLOSED) { + tasksToCloseDirty.add(task); + } + + taskExceptions.putIfAbsent(taskId, e); + } + } else if (tasks.removePendingTaskToClose(task.id())) { + try { + task.closeClean(); + if (task.isActive()) { + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); + } + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to close task %s cleanly. " + + "Attempting to close remaining tasks before re-throwing:", task.id()); + log.error(uncleanMessage, e); + + if (task.state() != State.CLOSED) { + tasksToCloseDirty.add(task); + } + + taskExceptions.putIfAbsent(task.id(), e); + } + } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + stateUpdater.add(task); Review Comment: Found this issue when adding unit test: after update the input partitions, need to add it back to state updater. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -656,6 +657,71 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum return allRunning; } + private void addTaskstoStateUpdater() { + for (final Task task : tasks.drainPendingTaskToInit()) { + task.initializeIfNeeded(); + stateUpdater.add(task); + } + } + + private void handleRemovedTasksFromStateUpdater() { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); + final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); + + for (final Task task : stateUpdater.drainRemovedTasks()) { + final TaskId taskId = task.id(); + Set<TopicPartition> inputPartitions; + if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { + try { + final Task newTask = task.isActive() ? + convertActiveToStandby((StreamTask) task, inputPartitions) : + convertStandbyToActive((StandbyTask) task, inputPartitions); + newTask.initializeIfNeeded(); + stateUpdater.add(newTask); + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " + + "Attempting to close remaining tasks before re-throwing:", taskId); + log.error(uncleanMessage, e); + + if (task.state() != State.CLOSED) { + tasksToCloseDirty.add(task); + } + + taskExceptions.putIfAbsent(taskId, e); + } + } else if (tasks.removePendingTaskToClose(task.id())) { + try { + task.closeClean(); Review Comment: Should not remove from the `tasks` since it's not in it, hence we could not call `closeClean`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -43,9 +43,7 @@ class Tasks { private final Logger log; - // TODO: change type to `StreamTask` private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>(); - // TODO: change type to `StandbyTask` private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>(); Review Comment: I tried to convert the types to StreamTask/StandbyTask, but realized that the blocker from doing so is because we have the stub class `StateMachineTask` in other unit tests which are not either. I then looked into if there's a better way to remove that stub class and instead just use a mock StreamTask/StandbyTask but that's actually not very convienent. So I figured unless we'd eventually remove `StateMachineTask` we'd have to keep as is. In a second thought, I think we'd probably want to remove it eventually, so I'd add those TODOs back. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1076,59 +1135,51 @@ private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeT } else { try { taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - - for (final Task task : activeTaskIterable()) { - try { - task.postCommit(true); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - maybeWrapAndSetFirstException(firstException, e, task.id()); - tasksToCloseDirty.add(task); - tasksToCloseClean.remove(task); - } - } - } catch (final TimeoutException timeoutException) { - firstException.compareAndSet(null, timeoutException); - - tasksToCloseClean.removeAll(tasksToCommit); - tasksToCloseDirty.addAll(tasksToCommit); - } catch (final TaskCorruptedException taskCorruptedException) { - firstException.compareAndSet(null, taskCorruptedException); - - final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks(); - final Set<Task> corruptedTasks = tasksToCommit + } catch (final RuntimeException e) { + log.error("Exception caught while committing tasks " + consumedOffsetsAndMetadataPerTask.keySet(), e); + // TODO: should record the task ids when handling this exception + maybeSetFirstException(false, e, firstException); + + if (e instanceof TaskCorruptedException) { + final TaskCorruptedException taskCorruptedException = (TaskCorruptedException) e; + final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks(); + final Set<Task> corruptedTasks = tasksToCommit .stream() .filter(task -> corruptedTaskIds.contains(task.id())) .collect(Collectors.toSet()); + tasksToCloseClean.removeAll(corruptedTasks); + tasksToCloseDirty.addAll(corruptedTasks); + } else { + // If the commit fails, everyone who participated in it must be closed dirty + tasksToCloseClean.removeAll(tasksToCommit); + tasksToCloseDirty.addAll(tasksToCommit); + } + } - tasksToCloseClean.removeAll(corruptedTasks); - tasksToCloseDirty.addAll(corruptedTasks); - } catch (final RuntimeException e) { - log.error("Exception caught while committing tasks during shutdown", e); - firstException.compareAndSet(null, e); - - // If the commit fails, everyone who participated in it must be closed dirty - tasksToCloseClean.removeAll(tasksToCommit); - tasksToCloseDirty.addAll(tasksToCommit); + for (final Task task : activeTaskIterable()) { + try { + task.postCommit(true); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task " + task.id(), e); + maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException); + tasksToCloseDirty.add(task); + tasksToCloseClean.remove(task); + } } } for (final Task task : tasksToCloseClean) { try { task.suspend(); - final RuntimeException exception = completeTaskCloseClean(task); - if (exception != null) { - firstException.compareAndSet(null, exception); - } - } catch (final StreamsException e) { - log.error("Exception caught while clean-closing task " + task.id(), e); - e.setTaskId(task.id()); - firstException.compareAndSet(null, e); - tasksToCloseDirty.add(task); + closeTaskClean(task); Review Comment: This is the minor cleanup for exception wrapping / handling. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -298,13 +298,12 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, logPrefix ); - final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>(); final Set<Task> tasksToCloseClean = new TreeSet<>(Comparator.comparing(Task::id)); - tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet()); + tasks.purgePendingTasksToCreate(activeTasks.keySet(), standbyTasks.keySet()); Review Comment: You're right! Will do so. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java: ########## @@ -71,6 +72,8 @@ public class ActiveTaskCreatorTest { private StateDirectory stateDirectory; @Mock(type = MockType.NICE) private ChangelogReader changeLogReader; + @Mock(type = MockType.NICE) + private ProcessorStateManager stateManager; Review Comment: Sorry, forget to remove it; it's for another work to address comments for the test coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org