flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1274457186
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ########## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) + @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) + @ValueSource(booleans = {false, true}) + public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { + initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); + int topicCount = hasConsumerRack ? 50 : 500; + int partitionCount = 2_00; + int consumerCount = 2_0; + + List<String> topics = new ArrayList<>(); + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) { + String topicName = getTopicName(i, topicCount); + topics.add(topicName); + partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); + } + for (int i = 0; i < consumerCount; i++) { + if (i % 4 == 0) { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(0, topicCount / 2), i)); + } else { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(topicCount / 2, topicCount), i)); + } + } + + Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + if (i % 4 == 0) { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(0, topicCount / 2), + assignment.get(consumer), generationId, i) + ); + } else { + subscriptions.put(consumer, + buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), + assignment.get(consumer), generationId, i) + ); + } + } + + assignor.assignPartitions(partitionsPerTopic, subscriptions); + } + + @Test + public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() { Review Comment: this method is to test ``allSubscriptionsEqual`` so its name is a mistake, thx! but this bug in ``allSubscriptionsEqual`` not lead to the specified partition that previously assigned to at least 3 generation consumers cannot be owned by the highest generation, but would lead this partition assign to more than one consumers so add to ``assignedPartitions`` more than once and also the function ``getUnassignedPartitions`` will get more partitions than actuality, as a result, many partitions already assigned to consumers will be added to ``unassignedPartitions``, all of these partitions would be assigned twice. I couldn't understand how to reuse such test style, could you provide any example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org