guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466105496
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -267,80 +266,19 @@ public void handleAssignment(final Map<TaskId,
Set<TopicPartition>> activeTasks,
// check for tasks that were owned previously but have changed
active/standby status
tasksToRecycle.add(task);
} else {
- tasksToClose.add(task);
- }
- }
-
- for (final Task task : tasksToClose) {
- try {
- if (task.isActive()) {
- // Active tasks are revoked and suspended/committed during
#handleRevocation
- if (!task.state().equals(State.SUSPENDED)) {
- log.error("Active task {} should be suspended prior to
attempting to close but was in {}",
- task.id(), task.state());
- throw new IllegalStateException("Active task " +
task.id() + " should have been suspended");
- }
- } else {
- task.suspend();
- task.prepareCommit();
- task.postCommit();
- }
- completeTaskCloseClean(task);
- cleanUpTaskProducer(task, taskCloseExceptions);
- tasks.remove(task.id());
- } catch (final RuntimeException e) {
- final String uncleanMessage = String.format(
- "Failed to close task %s cleanly. Attempting to close
remaining tasks before re-throwing:",
- task.id());
- log.error(uncleanMessage, e);
- taskCloseExceptions.put(task.id(), e);
- // We've already recorded the exception (which is the point of
clean).
- // Now, we should go ahead and complete the close because a
half-closed task is no good to anyone.
- dirtyTasks.add(task);
- }
- }
-
- for (final Task oldTask : tasksToRecycle) {
- final Task newTask;
- try {
- if (oldTask.isActive()) {
- if (!oldTask.state().equals(State.SUSPENDED)) {
- // Active tasks are revoked and suspended/committed
during #handleRevocation
- log.error("Active task {} should be suspended prior to
attempting to close but was in {}",
- oldTask.id(), oldTask.state());
- throw new IllegalStateException("Active task " +
oldTask.id() + " should have been suspended");
- }
- final Set<TopicPartition> partitions =
standbyTasksToCreate.remove(oldTask.id());
- newTask =
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask,
partitions);
- cleanUpTaskProducer(oldTask, taskCloseExceptions);
- } else {
- oldTask.suspend();
- oldTask.prepareCommit();
- oldTask.postCommit();
- final Set<TopicPartition> partitions =
activeTasksToCreate.remove(oldTask.id());
- newTask =
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask,
partitions, mainConsumer);
- }
- tasks.remove(oldTask.id());
- addNewTask(newTask);
- } catch (final RuntimeException e) {
- final String uncleanMessage = String.format("Failed to recycle
task %s cleanly. Attempting to close remaining tasks before re-throwing:",
oldTask.id());
- log.error(uncleanMessage, e);
- taskCloseExceptions.put(oldTask.id(), e);
- dirtyTasks.add(oldTask);
+ tasksToCloseClean.add(task);
}
}
- for (final Task task : dirtyTasks) {
- closeTaskDirty(task);
- cleanUpTaskProducer(task, taskCloseExceptions);
- tasks.remove(task.id());
- }
+ // close and recycle those tasks
+ handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean,
tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate,
taskCloseExceptions);
if (!taskCloseExceptions.isEmpty()) {
+ log.error("Hit exceptions while closing / recycling tasks: {}",
taskCloseExceptions);
Review comment:
Sounds good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]