philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177936654


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void 
testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void 
testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), 
currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), 
currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3),
+            partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), 
currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), 
tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), 
tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), 
currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), 
currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, 
topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), 
currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertTrue(assignment.get(consumer1).containsAll(
+            Arrays.asList(tp(topic2, 1),
+                tp(topic3, 0),
+                tp(topic1, 2))));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void 
testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig 
rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, 
topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), 
currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, 
topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), 
currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 1), 
tp(topic, 1))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), 
tp(topic2, 0))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+

Review Comment:
   ok 
`testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMultipleConsumersInSameGeneration`
 might cover it.  But I'll double check if that's what we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to