[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-13 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165947809


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.internals.Utils.PartitionComparator;

Review Comment:
   this is "optimized" by the IDE 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-13 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165959573


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -153,43 +158,46 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration - 1
 || !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
 
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {

Review Comment:
   we don't need this because the maxGeneration has already been computed 
previously, so we know the set of valid members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-13 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165960472


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -153,43 +158,45 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration - 1
 || !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
 
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
-
-membersOfCurrentHighestGeneration.add(consumer);
 for (final TopicPartition tp : memberData.partitions) {
-// filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
 String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
 if (otherConsumer == null) {
 // this partition is not owned by other consumer 
in the same generation
 ownedPartitions.add(tp);
 } else {
-log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
-+ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
- consumer, otherConsumer, tp, 
maxGeneration);
-
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
-partitionsWithMultiplePreviousOwners.add(tp);
+int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and 
{} claiming the same TopicPartition {} in the "
++ "same generation {}, this will 
be invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
maxGeneration);
+
partitionsWithMultiplePreviousOwners.add(tp);
+}
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
+continue;
+}
+
+if (memberGeneration > otherMemberGeneration) {
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in " +

Review Comment:
   I feel... duplicated ownership should be a warning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-17 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1169135808


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hey @dajac  thanks for the review. I think in case of the member misses a 
generation, we want to make sure the owned partitions are revoked (due to 
generation reset).  Regardlessly, it should still rejoin with its current 
partitions and should continue to hold on to its partition if it is only 1 
generation behind. If it is 1+ generations behind, circle back to the beginning 
of my response, we want to make sure they are revoked because the partition 
might have already been reassigned.
   
   This makes me think that this will happen regularly in medium to large 
groups. -> I think this might not be as uncommon as what we think, especially 
with a large consumer group deployed to multiple pods, considering the pods can 
be staled before sending out syncGroup, while another consumer in a different 
pod tries to join the group.
   
   I hope i'm answering your questions there, I apologize if I misunderstood 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm 
thinking is this: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should 
immediately re-send the join request. Are you thinking about the how client 
handles the illegal generation error? I think it is only being thrown during 
sync group and heartbeat. So I think by just resetting the generation shouldn't 
immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm 
thinking is this: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507-L511
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should 
immediately re-send the join request. Are you thinking about the how client 
handles the illegal generation error? I think it is only being thrown during 
sync group and heartbeat. So I think by just resetting the generation shouldn't 
immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170098942


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Added to my previous comment, I think you will need to set the 
`needsOnJoinPrepare` to true to go through the revocation as pointed out here: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L821



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170599794


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Offline discussion with @hachikuji  - it seems like what we want is to 
revoke the old partitions but resend these partitions on the subsequent join. 
RN, join only sends out the assigned partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171712519


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   saving the generation and partition state prior to resetting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171780417


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   i think the ration for resetting the generation is to ensure member's 
partitions get revoke during the onJoinPrepare, i'm trying to think is there's 
a better way to do this...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1172072549


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   I'm not sure what's the better way to invoke `onPartitionsLost` w/o 
resetting the generation. The alternative is to use a flag, but that's rather 
unclean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-25 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980400


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   This is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-25 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   this is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177893519


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   that seems like the case, reference to the snippet here:
   ```
   for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
   if (ownedPartitions.contains(doublyClaimedPartition)) {
   log.error("Found partition {} still claimed as owned 
by consumer {}, despite being claimed by multiple "
   + "consumers already in the same 
generation. Removing it from the ownedPartitions",
   doublyClaimedPartition, consumer);
   ownedPartitions.remove(doublyClaimedPartition);
   }   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177895473


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);
+}
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
+continue;

Review Comment:
   It could be.  I got into the habit of returning early, I thought it makes it 
easier to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177934535


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), 
currentGeneration - 2, 1));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), 
tp(topic3, 0))),
+new HashSet<>(assignment.get(consumer1)));
+assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), 
tp(topic3, 1))),
+new HashSet<>(assignment.get(consumer2)));
+assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic3, 2))),
+new HashSet<>(assignment.get(consumer3)));
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1), partitions(),
+DEFAULT_GENERATION, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), 
currentGeneration - 2, 2));
+subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), 
currentGeneration - 3, 3));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+// ensure assigned partitions don't get reassigned
+assertTrue(assignment.get(consumer1).containsAll(
+Arrays.asList(tp(topic2, 1),
+tp(topic3, 0),
+tp(topic1, 2;
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), 
currentGeneration - 2, 1));
+
+Map> assignment =

[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177936654


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), 
currentGeneration - 2, 1));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), 
tp(topic3, 0))),
+new HashSet<>(assignment.get(consumer1)));
+assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), 
tp(topic3, 1))),
+new HashSet<>(assignment.get(consumer2)));
+assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic3, 2))),
+new HashSet<>(assignment.get(consumer3)));
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1), partitions(),
+DEFAULT_GENERATION, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), 
currentGeneration - 1, 1));
+subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), 
currentGeneration - 2, 2));
+subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), 
currentGeneration - 3, 3));
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+// ensure assigned partitions don't get reassigned
+assertTrue(assignment.get(consumer1).containsAll(
+Arrays.asList(tp(topic2, 1),
+tp(topic3, 0),
+tp(topic1, 2;
+assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+assertTrue(isFullyBalanced(assignment));
+}
+
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+initializeRacks(rackConfig);
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+int currentGeneration = 10;
+
+subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), 
currentGeneration, 0));
+subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2),
+partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), 
currentGeneration - 2, 1));
+
+Map> assignment =

[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177937393


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -42,10 +30,22 @@
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.ByteBuffer;

Review Comment:
   oh yes, will do it thanks for my IDE's import optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178034681


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
 assertTrue(isFullyBalanced(assignment));
 }
 
+@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+@EnumSource(RackConfig.class)
+public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {

Review Comment:
   Ok I renamed the test to `testEnsurePartitionsAssignedToHighestGeneration` 
as the goal of this test is to make sure partitions are always assigned to the 
member with the highest generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-26 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178121430


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+maxGeneration = Math.max(maxGeneration, 
memberData.generation.orElse(DEFAULT_GENERATION));
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+if (memberGeneration == otherMemberGeneration) {
+if 
(subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == 
memberData.generation.orElse(DEFAULT_GENERATION)) {
+log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
++ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
+partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   From KAFKA-12984: 
   ```
   ...the assignor will now explicitly look out for partitions that are being 
claimed by multiple consumers ... we have to invalidate this partition from the 
ownedPartitions of both consumers, since we can't tell who, if anyone, has the 
valid claim to this partition.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-27 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1179185376


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 }
 
 MemberData memberData = memberData(subscription);
+final int memberGeneration = 
memberData.generation.orElse(DEFAULT_GENERATION);
+maxGeneration = Math.max(maxGeneration, memberGeneration);
 
 List ownedPartitions = new ArrayList<>();
 consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
-// generation, or it's generation is not present but we have not 
seen any known generation so far
-if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
-|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
-
-// If the current member's generation is higher, all the 
previously owned partitions are invalid
-if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-allPreviousPartitionsToOwner.clear();
-partitionsWithMultiplePreviousOwners.clear();
-for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
-
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-}
-
-membersOfCurrentHighestGeneration.clear();
-maxGeneration = memberData.generation.get();
-}
+// the member has a valid generation, so we can consider its owned 
partitions if it has the highest
+// generation amongst
+for (final TopicPartition tp : memberData.partitions) {
+if (allTopics.contains(tp.topic())) {
+String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+if (otherConsumer == null) {
+// this partition is not owned by other consumer in 
the same generation
+ownedPartitions.add(tp);
+} else {
+final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-membersOfCurrentHighestGeneration.add(consumer);
-for (final TopicPartition tp : memberData.partitions) {
-// filter out any topics that no longer exist or aren't 
part of the current subscription
-if (allTopics.contains(tp.topic())) {
-String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
-if (otherConsumer == null) {
-// this partition is not owned by other consumer 
in the same generation
-ownedPartitions.add(tp);
-} else {
+if (memberGeneration == otherMemberGeneration) {
 log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
-+ "same generation {}, this will be 
invalidated and removed from their previous assignment.",
- consumer, otherConsumer, tp, 
maxGeneration);
-
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
++ "same generation {}, this will 
be invalidated and removed from their previous assignment.",
+consumer, otherConsumer, tp, 
memberGeneration);
 partitionsWithMultiplePreviousOwners.add(tp);
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
+} else if (memberGeneration > otherMemberGeneration) {
+// move partition from the member with an older 
generation to the member with the newer generation
+consumerToOwnedPartitions.get(consumer).add(tp);
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
+// if memberGeneration > otherMemberGeneration, 
the other member continue owns the generation
+log.warn("{} in generation {} and {} in generation 
{} claiming the same TopicPartition {} in " +
+"different generations. The topic 
partition wil be assigned to the member with " +
+