hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
             joinPrepareTimer.update();
         }
 
-        final SortedSet<TopicPartition> partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
-
+        final SortedSet<TopicPartition> eagerPartitionsToRevoke = 
eagerPartitionsToRevoke(protocol);

Review Comment:
   Hmm, thinking about this a little more since we're down to just the eager 
protocol. Since the assignment won't change until the rebalance completes, 
maybe we do not need to precompute it. In other words, maybe we can restore the 
original logic in `revokePartitions` and we can change `markPendingPartitions` 
to something like this:
   
   ```java
   private void maybeMarkPartitionsPendingRevocation() {
     if (protocol == RebalanceProtocol.EAGER) {
       // When asynchronously committing offsets prior to the revocation of a 
set of partitions, there will be a
       // window of time between when the offset commit is sent and when it 
returns and revocation completes. It is
       // possible for pending fetches for these partitions to return during 
this time, which means the application's
       // position may get ahead of the committed position prior to revocation. 
This can cause duplicate consumption.
       // To prevent this, we mark the partitions as "pending revocation," 
which stops the Fetcher from sending new
       // fetches or returning data from previous fetches to the user.
       Set<TopicPartition> partitions = subscriptions.assignedPartitions()
       log.debug("Marking assigned partitions pending for revocation: {}", 
partitions);
       subscriptions.markPendingRevocation(partitions);
     }
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -859,36 +848,49 @@ private Optional<Exception> 
revokePartitions(SortedSet<TopicPartition> partition
         } else {
             switch (protocol) {
                 case EAGER:
-                    exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
+                    exception = 
Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke));
                     subscriptions.assignFromSubscribed(Collections.emptySet());
-
                     break;
 
                 case COOPERATIVE:
-                    Set<TopicPartition> ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
-                    partitions.addAll(ownedPartitions.stream()
-                            .filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
-                            .collect(Collectors.toSet()));
-
-                    if (!partitions.isEmpty()) {
-                        exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
-                        ownedPartitions.removeAll(partitions);
-                        subscriptions.assignFromSubscribed(ownedPartitions);
-                    }
+                    exception = revokeUnsubscribedPartitions();
                     break;
             }
         }
 
         return exception;
     }
 
-    private void markPartitionsUnconsumable(final Set<TopicPartition> 
partitions) {
-        // KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
-        //  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
-        //  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
-        //  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
-        //  revocation.
-        log.debug("Marking assigned partitions unconsumable: {}", partitions);
+    private Optional<Exception> revokeUnsubscribedPartitions() {
+        //For the cooperative strategy, partitions are usually revoked in 
onJoinComplete when the

Review Comment:
   nit: space after `//`
   
   Can we move this comment into the `COOPERATIVE` case in `revokePartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to