ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441059865
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -465,44 +429,82 @@ boolean tryToCompleteRestoration() {
}
/**
+ * Handle the revoked partitions and prepare for closing the associated
tasks in {@link #handleAssignment(Map, Map)}
+ * We should commit the revoked tasks now as we will not officially own
them anymore when {@link #handleAssignment(Map, Map)}
+ * is called. Note that only active task partitions are passed in from the
rebalance listener, so we only need to
+ * consider/commit active tasks here
+ *
+ * If eos-beta is used, we must commit ALL tasks. Otherwise, we can just
commit those (active) tasks which are revoked
+ *
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
- final Set<TopicPartition> remainingPartitions = new
HashSet<>(revokedPartitions);
+ final Set<TopicPartition> remainingRevokedPartitions = new
HashSet<>(revokedPartitions);
- final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
- for (final Task task : tasks.values()) {
- if (remainingPartitions.containsAll(task.inputPartitions())) {
- task.suspend();
- final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit();
+ final Set<Task> tasksToCommit = new HashSet<>();
+ final Set<Task> additionalTasksForCommitting = new HashSet<>();
- if (!committableOffsets.isEmpty()) {
- consumedOffsetsAndMetadataPerTask.put(task.id(),
committableOffsets);
+ final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
+ for (final Task task : activeTaskIterable()) {
+ if
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
+ try {
+ task.suspend();
+ if (task.commitNeeded()) {
+ tasksToCommit.add(task);
+ }
+ } catch (final RuntimeException e) {
+ log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
+ firstException.compareAndSet(null, new
StreamsException("Failed to suspend " + task.id(), e));
}
- } else if (task.isActive() && task.commitNeeded()) {
- final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit();
+ } else if (task.commitNeeded()) {
+ additionalTasksForCommitting.add(task);
+ }
+ remainingRevokedPartitions.removeAll(task.inputPartitions());
+ }
- if (!committableOffsets.isEmpty()) {
- consumedOffsetsAndMetadataPerTask.put(task.id(),
committableOffsets);
- }
+ if (!remainingRevokedPartitions.isEmpty()) {
+ log.warn("The following partitions {} are missing from the task
partitions. It could potentially " +
+ "due to race condition of consumer detecting the
heartbeat failure, or the tasks " +
+ "have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
+ }
+
+ final RuntimeException suspendException = firstException.get();
+ if (suspendException != null) {
+ throw suspendException;
+ }
+
+ // If using eos-beta, if we must commit any task then we must commit
all of them
+ // TODO: when KAFKA-9450 is done this will be less expensive, and we
can simplify by always committing everything
+ if (processingMode == EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
+ tasksToCommit.addAll(additionalTasksForCommitting);
+ }
+
+ final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+ for (final Task task : tasksToCommit) {
+ final Map<TopicPartition, OffsetAndMetadata> committableOffsets =
task.prepareCommit();
+ if (!committableOffsets.isEmpty()) {
+ consumedOffsetsAndMetadataPerTask.put(task.id(),
committableOffsets);
+ } else {
+ log.warn("Task {} claimed to need a commit but had no
committable consumed offsets", task.id());
Review comment:
Oh good point. I'll remove this
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]