showuon commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969115062
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ########## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } + @Test + public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { + buildFetcher(); + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 100); + subscriptions.seek(tp0, 100); + subscriptions.seek(tp0, 100); Review Comment: Any reason we seek tp0 to offset 100 three times? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ########## @@ -769,6 +773,7 @@ private static class TopicPartitionState { private Long logStartOffset; // the log start offset private Long lastStableOffset; private boolean paused; // whether this partition has been paused by the user + private boolean consumable; Review Comment: I like the name: `pendingRevocation`, too. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ########## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } + @Test + public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { + buildFetcher(); + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 100); + subscriptions.seek(tp0, 100); + subscriptions.seek(tp0, 100); + assertEquals(100, subscriptions.position(tp0).offset); + + assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused Review Comment: I don't understand the comment here. Where do we pause tp0? and it is fetchable now, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } + Optional<Exception> exception = revokePartitions(partitionsToRevoke, generation, memberId); + + isLeader = false; + subscriptions.resetGroupSubscription(); + joinPrepareTimer = null; + autoCommitOffsetRequestFuture = null; + timer.update(); + + if (exception.isPresent()) { + throw new KafkaException("User rebalance callback throws an error", exception.get()); + } + return true; + } + + private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { + SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR); + if (generation == Generation.NO_GENERATION.generationId || + memberId.equals(Generation.NO_GENERATION.memberId)) { + partitions.addAll(subscriptions.assignedPartitions()); + return partitions; + } + + switch (protocol) { + case EAGER: + partitions.addAll(subscriptions.assignedPartitions()); + break; + + case COOPERATIVE: + // Delay the partition revocation because we don't revoke the already owned partitions Review Comment: Agree that we handle the cooperative issue separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org