mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441056149



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -465,44 +429,82 @@ boolean tryToCompleteRestoration() {
     }
 
     /**
+     * Handle the revoked partitions and prepare for closing the associated 
tasks in {@link #handleAssignment(Map, Map)}
+     * We should commit the revoked tasks now as we will not officially own 
them anymore when {@link #handleAssignment(Map, Map)}
+     * is called. Note that only active task partitions are passed in from the 
rebalance listener, so we only need to
+     * consider/commit active tasks here
+     *
+     * If eos-beta is used, we must commit ALL tasks. Otherwise, we can just 
commit those (active) tasks which are revoked
+     *
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
-        final Set<TopicPartition> remainingPartitions = new 
HashSet<>(revokedPartitions);
+        final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);
 
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        for (final Task task : tasks.values()) {
-            if (remainingPartitions.containsAll(task.inputPartitions())) {
-                task.suspend();
-                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+        final Set<Task> tasksToCommit = new HashSet<>();
+        final Set<Task> additionalTasksForCommitting = new HashSet<>();
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+        for (final Task task : activeTaskIterable()) {
+            if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+                try {
+                    task.suspend();
+                    if (task.commitNeeded()) {
+                        tasksToCommit.add(task);
+                    }
+                } catch (final RuntimeException e) {
+                    log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
+                    firstException.compareAndSet(null, new 
StreamsException("Failed to suspend " + task.id(), e));
                 }
-            } else if (task.isActive() && task.commitNeeded()) {
-                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+            } else if (task.commitNeeded()) {
+                additionalTasksForCommitting.add(task);
+            }
+            remainingRevokedPartitions.removeAll(task.inputPartitions());
+        }
 
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                }
+        if (!remainingRevokedPartitions.isEmpty()) {
+            log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
+                         "due to race condition of consumer detecting the 
heartbeat failure, or the tasks " +
+                         "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
+        }
+
+        final RuntimeException suspendException = firstException.get();
+        if (suspendException != null) {
+            throw suspendException;
+        }
+
+        // If using eos-beta, if we must commit any task then we must commit 
all of them
+        // TODO: when KAFKA-9450 is done this will be less expensive, and we 
can simplify by always committing everything
+        if (processingMode ==  EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
+            tasksToCommit.addAll(additionalTasksForCommitting);
+        }
+
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        for (final Task task : tasksToCommit) {
+            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
task.prepareCommit();
+            if (!committableOffsets.isEmpty()) {
+                consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+            } else {
+                log.warn("Task {} claimed to need a commit but had no 
committable consumed offsets", task.id());

Review comment:
       Is this necessarily a warning? A wall-clock-time punctuation could have 
set `commitNeeded` to `true`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to