guozhangwang commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r439851616
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() { /** * <pre> * the following order must be followed: - * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed Review comment: Seems we would never commit and checkpoint state manager any more. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } - if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + try { + if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } + for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { + final Task task = tasks.get(taskId); + task.postCommit(); + } + } catch (final RuntimeException e) { + firstException.compareAndSet(null, e); Review comment: Yeah I think if the actual `consumer.commit` call failed, then we should not trigger postCommit for any one. As for `postCommit`, I think it should never fail (we swallow the IO exception happened, because for non-EOS it is just fine, for EOS we would bootstrap from scratch). ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); - final Set<Task> tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); - // first rectify all existing tasks final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); - final Set<Task> tasksToClose = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); + final LinkedList<Task> tasksToClose = new LinkedList<>(); + final Set<Task> tasksToRecycle = new HashSet<>(); final Set<Task> dirtyTasks = new HashSet<>(); + // first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); - if (task.commitNeeded()) { - additionalTasksForCommitting.add(task); - } activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); - // check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { + // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - try { - task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - - tasksToClose.add(task); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } + tasksToClose.add(task); } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + for (final Task task : tasksToClose) { try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: I re-read the current code structure and got some questions: 1) we collect checkpoint from `prepareCommit` and check if it is not null in `postCommit`, but the actual checkpoint value itself is always collectable post the commit, and hence what's only required to that we need to know if we need to write a checkpoint file or not. Previously this needs to be decided since we may transit the state in between but now from the source code it seems to me that we would only call `prepare/post` before suspend / close ever, so this is no longer required actually, i.e. we can decide whether we need to checkpoint and then collect the checkpoint map and write the file if needed in a single call. Is that right? 2. I think I agree with you that it is cleaner to make sure in `handleRevocation`, we still transit those revoked partition's corresponding tasks to suspended even if some of their commit call failed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org