[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969115062


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);

Review Comment:
   Any reason we seek tp0 to offset 100 three times?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
 private Long logStartOffset; // the log start offset
 private Long lastStableOffset;
 private boolean paused;  // whether this partition has been paused by 
the user
+private boolean consumable;

Review Comment:
   I like the name: `pendingRevocation`, too.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+assertEquals(100, subscriptions.position(tp0).offset);
+
+assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   I don't understand the comment here. Where do we pause tp0? and it is 
fetchable now, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   Agree that we handle the cooperative issue separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-09 Thread GitBox


showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
+
 // async commit offsets prior to rebalance if auto-commit enabled
 // and there is no in-flight offset commit request
-if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+if (autoCommitEnabled) {
+pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from 
   "committing offset for subscribed partitions -> revoking", 
   to "revoking -> committing offset for subscribed partitions **+ revoked 
partitions**". 
   
   I don't think it is a good idea because that would open a door to allow 
consumer committing offsets for partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-09 Thread GitBox


showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
+
 // async commit offsets prior to rebalance if auto-commit enabled
 // and there is no in-flight offset commit request
-if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+if (autoCommitEnabled) {
+pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from 
   "committing offset subscribed partitions -> revoking", 
   to "revoking -> committing offset subscribed partitions(including revoked 
partitions)". 
   
   I don't think it is a good idea because that would open a door to allow 
consumer committing offsets for partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-09 Thread GitBox


showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r967027203


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
+
 // async commit offsets prior to rebalance if auto-commit enabled
 // and there is no in-flight offset commit request
-if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+if (autoCommitEnabled) {
+pausePartitions(partitionsToRevoke);

Review Comment:
   I think what Jason suggested, is change from "committing offset subscribed 
partitions -> revoking", to "revoking -> "committing offset subscribed 
partitions(including revoked partitions)". I don't think it is a good idea 
because that would open a door to allow consumer committing offsets for 
partitions it doesn't own. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org