[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969124759


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I filed this issue: https://issues.apache.org/jira/browse/KAFKA-14224.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969035562


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2307,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testPendingRevacationPartitionFetching() {

Review Comment:
   nit: Revocation is misspelled
   
   I did not find the name very clear. It looks like the main difference 
between this and `testFetchingPendingPartitions` is that this method tests that 
the pending state gets reset after reassignment? Perhaps the name should 
reflect that?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -272,6 +272,30 @@ public void testFetchNormal() {
 }
 }
 
+@Test
+public void testFetchingPendingPartitions() {
+buildFetcher();
+
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 0);
+
+// normal fetch
+assertEquals(1, fetcher.sendFetches());
+client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+consumerClient.poll(time.timer(0));
+assertTrue(fetcher.hasCompletedFetches());
+fetchedRecords();
+assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+// mark partition unfetchable
+subscriptions.markPendingRevocation(singleton(tp0));

Review Comment:
   Another scenario is that we already have the fetch inflight when we mark 
pending revocation. Can we test that as well?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##
@@ -256,6 +256,15 @@ public void partitionPause() {
 assertTrue(state.isFetchable(tp0));
 }
 
+@Test
+public void testMarkingPartitionPending() {
+state.assignFromUser(singleton(tp0));
+state.seek(tp0, 100);
+assertTrue(state.isFetchable(tp0));
+state.markPendingRevocation(singleton(tp0));
+assertFalse(state.isFetchable(tp0));

Review Comment:
   Perhaps we can also assert `isPaused` is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021668


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 autoCommitOffsetRequestFuture = null;
 timer.update();
 
-if (exception != null) {
-throw new KafkaException("User rebalance callback throws an 
error", exception);
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
 }
 return true;
 }
 
+private SortedSet 
eagerPartitionsToRevoke(RebalanceProtocol protocol) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (protocol != RebalanceProtocol.EAGER) {
+return partitions;
+}
+
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+private void markPendingPartitions() {

Review Comment:
   How about `maybeMarkPartitionsPendingRevocation`? Otherwise it's a little 
unclear what exactly is pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021352


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 autoCommitOffsetRequestFuture = null;
 timer.update();
 
-if (exception != null) {
-throw new KafkaException("User rebalance callback throws an 
error", exception);
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
 }
 return true;
 }
 
+private SortedSet 
eagerPartitionsToRevoke(RebalanceProtocol protocol) {

Review Comment:
   nit: not using this anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969020626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -935,6 +941,9 @@ private boolean isPaused() {
 return paused;
 }
 
+private boolean isPendingRevocation() {

Review Comment:
   nit: do we need this? Is it used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
-final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
-
+final SortedSet eagerPartitionsToRevoke = 
eagerPartitionsToRevoke(protocol);

Review Comment:
   Hmm, thinking about this a little more since we're down to just the eager 
protocol. Since the assignment won't change until the rebalance completes, 
maybe we do not need to precompute it. In other words, maybe we can restore the 
original logic in `revokePartitions` and we can change `markPendingPartitions` 
to something like this:
   
   ```java
   private void maybeMarkPartitionsPendingRevocation() {
 if (protocol == RebalanceProtocol.EAGER) {
   // When asynchronously committing offsets prior to the revocation of a 
set of partitions, there will be a
   // window of time between when the offset commit is sent and when it 
returns and revocation completes. It is
   // possible for pending fetches for these partitions to return during 
this time, which means the application's
   // position may get ahead of the committed position prior to revocation. 
This can cause duplicate consumption.
   // To prevent this, we mark the partitions as "pending revocation," 
which stops the Fetcher from sending new
   // fetches or returning data from previous fetches to the user.
   Set partitions = subscriptions.assignedPartitions()
   log.debug("Marking assigned partitions pending for revocation: {}", 
partitions);
   subscriptions.markPendingRevocation(partitions);
 }
   }
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -859,36 +848,49 @@ private Optional 
revokePartitions(SortedSet partition
 } else {
 switch (protocol) {
 case EAGER:
-exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
+exception = 
Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke));
 subscriptions.assignFromSubscribed(Collections.emptySet());
-
 break;
 
 case COOPERATIVE:
-Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
-partitions.addAll(ownedPartitions.stream()
-.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
-.collect(Collectors.toSet()));
-
-if (!partitions.isEmpty()) {
-exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
-ownedPartitions.removeAll(partitions);
-subscriptions.assignFromSubscribed(ownedPartitions);
-}
+exception = revokeUnsubscribedPartitions();
 break;
 }
 }
 
 return exception;
 }
 
-private void markPartitionsUnconsumable(final Set 
partitions) {
-// KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
-//  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
-//  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
-//  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
-//  revocation.
-log.debug("Marking assigned partitions unconsumable: {}", partitions);
+private Optional revokeUnsubscribedPartitions() {
+//For the cooperative strategy, partitions are usually revoked in 
onJoinComplete when the

Review Comment:
   nit: space after `//`
   
   Can we move this comment into the `COOPERATIVE` case in `revokePartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partitions in 
`onJoinComplete`, so that made me wonder why we don't have the same issue 
there. In fact, there is no additional offset commit in the current logic, 
which makes me think that the cooperative logic would already be more prone to 
duplicate consumption. We don't need to fix this here since it seems to be a 
pre-existing issue, but I am wondering if the failing system tests also cover 
cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partition in 
`onJoinComplete`, so that made me wonder why we don't have the same issue 
there. In fact, there is no additional offset commit in the current logic, 
which makes me think that the cooperative logic would already be more prone to 
duplicate consumption. We don't need to fix this here since it seems to be a 
pre-existing issue, but I am wondering if the failing system tests also cover 
cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968922937


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+// Delay the partition revocation because we don't revoke the 
already owned partitions
+return partitions;
+}
+
+log.warn("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private Optional revokePartitions(SortedSet 
partitions, int generation, String memberId) {
+
 // the generation / member-id can possibly be reset by the heartbeat 
thread
 // upon getting errors or heartbeat timeouts; in this case whatever is 
previously
 // owned partitions would be lost, we should trigger the callback and 
cleanup the assignment;
 // otherwise we can proceed normally and revoke the partitions 
depending on the protocol,
 // and in that case we should only change the assignment AFTER the 
revoke callback is triggered
 // so that users can still access the previously owned partitions to 
commit offsets etc.
-Exception exception = null;
-final SortedSet revokedPartitions = new 
TreeSet<>(COMPARATOR);
+Optional exception = Optional.empty();
 if (generation == Generation.NO_GENERATION.generationId ||
-memberId.equals(Generation.NO_GENERATION.memberId)) {
-revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-if (!revokedPartitions.isEmpty()) {
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+if (!partitions.isEmpty()) {
 log.info("Giving away all assigned partitions as lost since 
generation/memberID has been reset," +
-"indicating that consumer is in old state or no longer 
part of the group");
-exception = invokePartitionsLost(revokedPartitions);
-
+"indicating that consumer is in old state or no longer 
part of the group");
+exception = 
Optional.ofNullable(invokePartitionsLost(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 }
 } else {
 switch (protocol) {
 case EAGER:
-// revoke all partitions
-
revokedPartitions.addAll(subscriptions.assignedPartitions());
-exception = invokePartitionsRevoked(revokedPartitions);
-
+exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 
 break;
 
 case COOPERATIVE:
-// only revoke those partitions that are not in the 
subscription any more.
 Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
-revokedPartitions.addAll(ownedPartitions.stream()
-.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
-.collect(Collectors.toSet()));
-
-if (!revokedPartitions.isEmpty()) {
-exception = invokePartitionsRevoked(revokedPartitions);
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
 
-ownedPartitions.removeAll(revokedPartiti

[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968897177


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+// Delay the partition revocation because we don't revoke the 
already owned partitions
+return partitions;
+}
+
+log.warn("Invalid protocol: {}. No partition will be revoked.", 
protocol);

Review Comment:
   I think this code is dead? Why don't we use a `switch` like we had before? 
Then the compiler can help us ensure we handle new cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968754388


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
 private Long logStartOffset; // the log start offset
 private Long lastStableOffset;
 private boolean paused;  // whether this partition has been paused by 
the user
+private boolean consumable;

Review Comment:
   How about calling this `pendingRevocation` or something like that? That 
might make the usage clearer.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,94 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
+return partitions;
+}
+
+log.debug("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private Optional revokePartitions(SortedSet 
partitions, int generation, String memberId) {
+
 // the generation / member-id can possibly be reset by the heartbeat 
thread
 // upon getting errors or heartbeat timeouts; in this case whatever is 
previously
 // owned partitions would be lost, we should trigger the callback and 
cleanup the assignment;
 // otherwise we can proceed normally and revoke the partitions 
depending on the protocol,
 // and in that case we should only change the assignment AFTER the 
revoke callback is triggered
 // so that users can still access the previously owned partitions to 
commit offsets etc.
-Exception exception = null;
-final SortedSet revokedPartitions = new 
TreeSet<>(COMPARATOR);
+Optional exception = Optional.empty();
 if (generation == Generation.NO_GENERATION.generationId ||
-memberId.equals(Generation.NO_GENERATION.memberId)) {
-revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-if (!revokedPartitions.isEmpty()) {
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+if (!partitions.isEmpty()) {
 log.info("Giving away all assigned partitions as lost since 
generation/memberID has been reset," +
-"indicating that consumer is in old state or no longer 
part of the group");
-exception = invokePartitionsLost(revokedPartitions);
-
+"indicating that consumer is in old state or no longer 
part of the group");
+exception = 
Optional.ofNullable(invokePartitionsLost(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 }
 } else {
 switch (protocol) {
 case EAGER:
-// revoke all partitions
-
revokedPartitions.addAll(subscriptions.assignedPartitions());
-exception = invokePartitionsRevoked(revokedPartitions);
-
+exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 
 break;
 
 case COOPERATIVE:
-// only revoke those partitions that are not in the 
subscription any more.
 Set owne

[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-08 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966512403


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
+return partitions;
+}
+
+log.debug("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private void pausePartitions(Set partitions) {
+// KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
+//  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
+//  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
+//  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
+//  revocation.
+
+log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   I have mixed feelings about reusing the pause mechanism here. On the one 
hand, it does what we want. On the other hand, the pause state can be mutated 
by the user. What if the user calls `resume()` on a partition that we paused 
internally? Sounds crazy perhaps, but I think I'd rather have a mechanism that 
can only be accessed internally for stuff like this.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
+
 // async commit offsets prior to rebalance if auto-commit enabled
 // and there is no in-flight offset commit request
-if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+if (autoCommitEnabled) {
+pausePartitions(partitionsToRevoke);

Review Comment:
   It seems like this bug (and some of the complexity in this patch) is due to 
the fact that we do the auto-commit prior to revoking partitions. I wonder if 
that is really necessary. If we revoke first, then the partitions would be 
removed from `SubscriptionState` and we wouldn't have to worry about fetches 
for these partitions returning. Could that work as well?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Is your concern that the subscription could change in between the time that 
we  pause the partitions and the time that the revocation callback is triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.