ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r440473726



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() {
     /**
      * <pre>
      * the following order must be followed:
-     *  1. checkpoint the state manager -- even if we crash before this step, 
EOS is still guaranteed
+     *  1. commit/checkpoint the state manager -- even if we crash before this 
step, EOS is still guaranteed

Review comment:
       ack

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
                 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed 
active/standby status
             } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the 
point of clean).
-                    // Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
       1. I think you're right, we don't need to keep track of the current 
`checkpoint` offsets at all and can just write the current 
`checkpointableOffsets` in `postCommit`
   2. done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to