hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } - final SortedSet<TopicPartition> partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); - + final SortedSet<TopicPartition> eagerPartitionsToRevoke = eagerPartitionsToRevoke(protocol); Review Comment: Hmm, thinking about this a little more since we're down to just the eager protocol. Since the assignment won't change until the rebalance completes, maybe we do not need to precompute it. In other words, maybe we can restore the original logic in `revokePartitions` and we can change `markPendingPartitions` to something like this: ```java private void maybeMarkPartitionsPendingRevocation() { if (protocol == RebalanceProtocol.EAGER) { // When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a // window of time between when the offset commit is sent and when it returns and revocation completes. It is // possible for pending fetches for these partitions to return during this time, which means the application's // position may get ahead of the committed position prior to revocation. This can cause duplicate consumption. // To prevent this, we mark the partitions as "pending revocation," which stops the Fetcher from sending new // fetches or returning data from previous fetches to the user. Set<TopicPartition> partitions = subscriptions.assignedPartitions() log.debug("Marking assigned partitions pending for revocation: {}", partitions); subscriptions.markPendingRevocation(partitions); } } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -859,36 +848,49 @@ private Optional<Exception> revokePartitions(SortedSet<TopicPartition> partition } else { switch (protocol) { case EAGER: - exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); + exception = Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke)); subscriptions.assignFromSubscribed(Collections.emptySet()); - break; case COOPERATIVE: - Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); - partitions.addAll(ownedPartitions.stream() - .filter(tp -> !subscriptions.subscription().contains(tp.topic())) - .collect(Collectors.toSet())); - - if (!partitions.isEmpty()) { - exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); - ownedPartitions.removeAll(partitions); - subscriptions.assignFromSubscribed(ownedPartitions); - } + exception = revokeUnsubscribedPartitions(); break; } } return exception; } - private void markPartitionsUnconsumable(final Set<TopicPartition> partitions) { - // KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated - // data returned by the consumer poll loop. Without pausing the partitions, the consumer will move forward - // returning the data w/o committing them. And the progress will be lost once the partition is revoked. - // This only applies to autocommits, as we expect user to handle the offsets menually during the partition - // revocation. - log.debug("Marking assigned partitions unconsumable: {}", partitions); + private Optional<Exception> revokeUnsubscribedPartitions() { + //For the cooperative strategy, partitions are usually revoked in onJoinComplete when the Review Comment: nit: space after `//` Can we move this comment into the `COOPERATIVE` case in `revokePartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org