mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r441056149
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -465,44 +429,82 @@ boolean tryToCompleteRestoration() { } /** + * Handle the revoked partitions and prepare for closing the associated tasks in {@link #handleAssignment(Map, Map)} + * We should commit the revoked tasks now as we will not officially own them anymore when {@link #handleAssignment(Map, Map)} + * is called. Note that only active task partitions are passed in from the rebalance listener, so we only need to + * consider/commit active tasks here + * + * If eos-beta is used, we must commit ALL tasks. Otherwise, we can just commit those (active) tasks which are revoked + * * @throws TaskMigratedException if the task producer got fenced (EOS only) */ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { - final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions); + final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasks.values()) { - if (remainingPartitions.containsAll(task.inputPartitions())) { - task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + final Set<Task> tasksToCommit = new HashSet<>(); + final Set<Task> additionalTasksForCommitting = new HashSet<>(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); + for (final Task task : activeTaskIterable()) { + if (remainingRevokedPartitions.containsAll(task.inputPartitions())) { + try { + task.suspend(); + if (task.commitNeeded()) { + tasksToCommit.add(task); + } + } catch (final RuntimeException e) { + log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); + firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e)); } - } else if (task.isActive() && task.commitNeeded()) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + } else if (task.commitNeeded()) { + additionalTasksForCommitting.add(task); + } + remainingRevokedPartitions.removeAll(task.inputPartitions()); + } - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } + if (!remainingRevokedPartitions.isEmpty()) { + log.warn("The following partitions {} are missing from the task partitions. It could potentially " + + "due to race condition of consumer detecting the heartbeat failure, or the tasks " + + "have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions); + } + + final RuntimeException suspendException = firstException.get(); + if (suspendException != null) { + throw suspendException; + } + + // If using eos-beta, if we must commit any task then we must commit all of them + // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything + if (processingMode == EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) { + tasksToCommit.addAll(additionalTasksForCommitting); + } + + final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); + for (final Task task : tasksToCommit) { + final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + if (!committableOffsets.isEmpty()) { + consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + } else { + log.warn("Task {} claimed to need a commit but had no committable consumed offsets", task.id()); Review comment: Is this necessarily a warning? A wall-clock-time punctuation could have set `commitNeeded` to `true`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org