dajac commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1034544258


##########
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java:
##########
@@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testMemberDataWithInconsistentData() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1);
+        List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0);
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+        // The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+        // If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription

Review Comment:
   This is not true anymore, isn't it? My understanding is that we always take 
the data from user data.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java:
##########
@@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testMemberDataWithInconsistentData() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1);
+        List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0);
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+        // The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+        // If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription
+        Subscription subscription = new Subscription(topics(topic), 
userDataWithHigherGenerationId, ownedPartitionsInSubscription);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+
+        // In StickyAssignor, we'll serialize owned partition in assignment 
into userData
+        assertEquals(ownedPartitionsInUserdata, memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
+        assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+    }
+
+    @Test
+    public void testMemberDataWithEmptyPartitionsAndEqualGeneration() {
+        List<String> topics = topics(topic);
+        List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
+
+        // subscription containing empty owned partitions and the same 
generation id, and non-empty owned partition in user data,
+        // member data should honor the one in user data
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId), 
Collections.emptyList(), generationId);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+        assertEquals(ownedPartitions, memberData.partitions, "subscription: " 
+ subscription + " doesn't have expected owned partition");
+        assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+    }
+
+    @Test
+    public void testMemberDataWithEmptyPartitionsAndHigherGeneration() {
+        List<String> topics = topics(topic);
+        List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
+        int generationIdInUserData = generationId - 1;
+
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId - 1), 
Collections.emptyList(), generationId);

Review Comment:
   nit: Could we reuse `generationIdInUserData` instead of `generationId - 1`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java:
##########
@@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testMemberDataWithInconsistentData() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1);
+        List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0);
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+        // The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+        // If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription
+        Subscription subscription = new Subscription(topics(topic), 
userDataWithHigherGenerationId, ownedPartitionsInSubscription);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+
+        // In StickyAssignor, we'll serialize owned partition in assignment 
into userData
+        assertEquals(ownedPartitionsInUserdata, memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
+        assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+    }
+
+    @Test
+    public void testMemberDataWithEmptyPartitionsAndEqualGeneration() {

Review Comment:
   I wonder if this test is useful with the last version. We always take from 
user data anyway.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java:
##########
@@ -272,6 +284,73 @@ public void testSchemaBackwardCompatibility() {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testMemberDataWithInconsistentData() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1);
+        List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0);
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+        // The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+        // If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription
+        Subscription subscription = new Subscription(topics(topic), 
userDataWithHigherGenerationId, ownedPartitionsInSubscription);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+
+        // In StickyAssignor, we'll serialize owned partition in assignment 
into userData
+        assertEquals(ownedPartitionsInUserdata, memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
+        assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+    }
+
+    @Test
+    public void testMemberDataWithEmptyPartitionsAndEqualGeneration() {
+        List<String> topics = topics(topic);
+        List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
+
+        // subscription containing empty owned partitions and the same 
generation id, and non-empty owned partition in user data,
+        // member data should honor the one in user data
+        Subscription subscription = new Subscription(topics, 
generateUserData(topics, ownedPartitions, generationId), 
Collections.emptyList(), generationId);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+        assertEquals(ownedPartitions, memberData.partitions, "subscription: " 
+ subscription + " doesn't have expected owned partition");
+        assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+    }
+
+    @Test
+    public void testMemberDataWithEmptyPartitionsAndHigherGeneration() {

Review Comment:
   nit: Should we name this test to reflect that we always take from the user 
data?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -116,6 +130,74 @@ public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleCo
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testMemberDataWithInconsistentData() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        List<TopicPartition> ownedPartitionsInUserdata = partitions(tp1);
+        List<TopicPartition> ownedPartitionsInSubscription = partitions(tp0);
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+        // The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+        // If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription
+        Subscription subscription = new Subscription(topics(topic), 
userDataWithHigherGenerationId, ownedPartitionsInSubscription);
+
+        AbstractStickyAssignor.MemberData memberData = 
memberData(subscription);
+        // In CooperativeStickyAssignor, we'll serialize owned partition in 
subscription into userData

Review Comment:
   Is this sentence correct? In CooperativeStickyAssignor, only the generation 
is put in user data, no?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -44,14 +46,26 @@ public AbstractStickyAssignor createAssignor() {
     }
 
     @Override
-    public Subscription buildSubscription(List<String> topics, 
List<TopicPartition> partitions) {
-        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
+    public Subscription buildSubscriptionV0(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
+        // cooperative sticky assignor only supports 
ConsumerProtocolSubscription V1 or above
+        return null;
     }
 
     @Override
-    public Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
-        assignor.onAssignment(null, new 
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", 
Optional.empty()));
-        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
+    public Subscription buildSubscriptionV1(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions, DEFAULT_GENERATION);
+    }
+
+    @Override
+    public Subscription buildSubscriptionV2Above(List<String> topics, 
List<TopicPartition> partitions, int generationId) {
+        return new Subscription(topics, null, partitions, generationId);

Review Comment:
   Using `null` is not 100% correct here, isn't it? My understanding is that 
the implementation always serializes the user data, even with version >= 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to