dajac commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1034544258
########## clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java: ########## @@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } + @Test + public void testMemberDataWithInconsistentData() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1); + List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0); + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); + // The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id + // If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription Review Comment: This is not true anymore, isn't it? My understanding is that we always take the data from user data. ########## clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java: ########## @@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } + @Test + public void testMemberDataWithInconsistentData() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1); + List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0); + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); + // The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id + // If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription + Subscription subscription = new Subscription(topics(topic), userDataWithHigherGenerationId, ownedPartitionsInSubscription); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + + // In StickyAssignor, we'll serialize owned partition in assignment into userData + assertEquals(ownedPartitionsInUserdata, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); + assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); + } + + @Test + public void testMemberDataWithEmptyPartitionsAndEqualGeneration() { + List<String> topics = topics(topic); + List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); + + // subscription containing empty owned partitions and the same generation id, and non-empty owned partition in user data, + // member data should honor the one in user data + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId), Collections.emptyList(), generationId); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + assertEquals(ownedPartitions, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); + assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); + } + + @Test + public void testMemberDataWithEmptyPartitionsAndHigherGeneration() { + List<String> topics = topics(topic); + List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); + int generationIdInUserData = generationId - 1; + + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId - 1), Collections.emptyList(), generationId); Review Comment: nit: Could we reuse `generationIdInUserData` instead of `generationId - 1`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java: ########## @@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } + @Test + public void testMemberDataWithInconsistentData() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1); + List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0); + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); + // The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id + // If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription + Subscription subscription = new Subscription(topics(topic), userDataWithHigherGenerationId, ownedPartitionsInSubscription); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + + // In StickyAssignor, we'll serialize owned partition in assignment into userData + assertEquals(ownedPartitionsInUserdata, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); + assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); + } + + @Test + public void testMemberDataWithEmptyPartitionsAndEqualGeneration() { Review Comment: I wonder if this test is useful with the last version. We always take from user data anyway. ########## clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java: ########## @@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } + @Test + public void testMemberDataWithInconsistentData() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1); + List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0); + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); + // The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id + // If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription + Subscription subscription = new Subscription(topics(topic), userDataWithHigherGenerationId, ownedPartitionsInSubscription); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + + // In StickyAssignor, we'll serialize owned partition in assignment into userData + assertEquals(ownedPartitionsInUserdata, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); + assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); + } + + @Test + public void testMemberDataWithEmptyPartitionsAndEqualGeneration() { + List<String> topics = topics(topic); + List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); + + // subscription containing empty owned partitions and the same generation id, and non-empty owned partition in user data, + // member data should honor the one in user data + Subscription subscription = new Subscription(topics, generateUserData(topics, ownedPartitions, generationId), Collections.emptyList(), generationId); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + assertEquals(ownedPartitions, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); + assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); + } + + @Test + public void testMemberDataWithEmptyPartitionsAndHigherGeneration() { Review Comment: nit: Should we name this test to reflect that we always take from the user data? ########## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ########## @@ -116,6 +130,74 @@ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleCo assertTrue(isFullyBalanced(assignment)); } + @Test + public void testMemberDataWithInconsistentData() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1); + List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0); + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); + // The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id + // If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription + Subscription subscription = new Subscription(topics(topic), userDataWithHigherGenerationId, ownedPartitionsInSubscription); + + AbstractStickyAssignor.MemberData memberData = memberData(subscription); + // In CooperativeStickyAssignor, we'll serialize owned partition in subscription into userData Review Comment: Is this sentence correct? In CooperativeStickyAssignor, only the generation is put in user data, no? ########## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ########## @@ -44,14 +46,26 @@ public AbstractStickyAssignor createAssignor() { } @Override - public Subscription buildSubscription(List<String> topics, List<TopicPartition> partitions) { - return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); + public Subscription buildSubscriptionV0(List<String> topics, List<TopicPartition> partitions, int generationId) { + // cooperative sticky assignor only supports ConsumerProtocolSubscription V1 or above + return null; } @Override - public Subscription buildSubscriptionWithGeneration(List<String> topics, List<TopicPartition> partitions, int generation) { - assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); - return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); + public Subscription buildSubscriptionV1(List<String> topics, List<TopicPartition> partitions, int generationId) { + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions, DEFAULT_GENERATION); + } + + @Override + public Subscription buildSubscriptionV2Above(List<String> topics, List<TopicPartition> partitions, int generationId) { + return new Subscription(topics, null, partitions, generationId); Review Comment: Using `null` is not 100% correct here, isn't it? My understanding is that the implementation always serializes the user data, even with version >= 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org