ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440473726
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() { /** * <pre> * the following order must be followed: - * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed Review comment: ack ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); - final Set<Task> tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); - // first rectify all existing tasks final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); - final Set<Task> tasksToClose = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); + final LinkedList<Task> tasksToClose = new LinkedList<>(); + final Set<Task> tasksToRecycle = new HashSet<>(); final Set<Task> dirtyTasks = new HashSet<>(); + // first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); - if (task.commitNeeded()) { - additionalTasksForCommitting.add(task); - } activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); - // check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { + // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - try { - task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - - tasksToClose.add(task); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } + tasksToClose.add(task); } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + for (final Task task : tasksToClose) { try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: 1. I think you're right, we don't need to keep track of the current `checkpoint` offsets at all and can just write the current `checkpointableOffsets` in `postCommit` 2. done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org