[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1274490680 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 500; +int partitionCount = 2_00; +int consumerCount = 2_0; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCount; i++) { +if (i % 4 == 0) { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(0, topicCount / 2), i)); +} else { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(topicCount / 2, topicCount), i)); +} +} + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + +for (int i = 1; i < consumerCount; i++) { +String consumer = getConsumerName(i, consumerCount); +if (i % 4 == 0) { +subscriptions.put( +consumer, +buildSubscriptionV2Above(topics.subList(0, topicCount / 2), +assignment.get(consumer), generationId, i) +); +} else { +subscriptions.put(consumer, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1274483457 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) Review Comment: Oh, this 90 should be sec, I set 90 sec because in my m1 air this test may run about 25 sec or more time. I tried ``TestUtils.waitForCondition`` but seems it is not effective and couldn't throw Exception when execution use time more than expected, I'm not familiar with this and couldn't find the reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1274457186 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 500; +int partitionCount = 2_00; +int consumerCount = 2_0; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCount; i++) { +if (i % 4 == 0) { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(0, topicCount / 2), i)); +} else { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(topicCount / 2, topicCount), i)); +} +} + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + +for (int i = 1; i < consumerCount; i++) { +String consumer = getConsumerName(i, consumerCount); +if (i % 4 == 0) { +subscriptions.put( +consumer, +buildSubscriptionV2Above(topics.subList(0, topicCount / 2), +assignment.get(consumer), generationId, i) +); +} else { +subscriptions.put(consumer, +buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), +assignment.get(consumer), generationId, i) +); +} +} + +assignor.assignPartitions(partitionsPerTopic, subscriptions); +} + +@Test +public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() { Review Comment: this method is to test ``allSubscriptionsEqual`` so its name is a mistake, thx! but this bug in ``allSubscriptionsEqual`` not lead to the specified partition that previously assigned to at least 3 generation consumers cannot be owned by the highest generation, but would lead this partition assign to more than one consumers so add to ``assignedPartitions`` more than once and also the function ``getUnassignedPartitions`` will get more partitions than actuality, as a result, many partitions already assigned to consumers will be added to ``unassignedPartitions``, all of these partitions would be assigned twice. I couldn't understand how to reuse such test style, could you provide any example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1274341109 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -724,6 +725,90 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 500; +int partitionCount = 2_00; +int consumerCount = 2_0; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCount; i++) { +if (i % 4 == 0) { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(0, topicCount / 2), i)); +} else { +subscriptions.put(getConsumerName(i, consumerCount), +subscription(topics.subList(topicCount / 2, topicCount), i)); +} +} + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + +for (int i = 1; i < consumerCount; i++) { +String consumer = getConsumerName(i, consumerCount); +if (i % 4 == 0) { +subscriptions.put( +consumer, +buildSubscriptionV2Above(topics.subList(0, topicCount / 2), +assignment.get(consumer), generationId, i) +); +} else { +subscriptions.put(consumer, +buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), +assignment.get(consumer), generationId, i) +); +} +} + +assignor.assignPartitions(partitionsPerTopic, subscriptions); Review Comment: for now ``GeneralAssignmentBuilder`` is the inner private class of ``AbstractStickyAssignor``, test it directly need modify it to protected access level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1266138582 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -724,6 +724,70 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 500; +int partitionCount = 2_00; +int consumerCount = 2_0; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCount; i++) { +if (i % 4 == 0) { +subscriptions.put(getConsumerName(i, consumerCount), subscription(topics.subList(0, topicCount / 2), +i)); +} else { +subscriptions.put(getConsumerName(i, consumerCount), subscription(topics.subList(topicCount / 2, +topicCount), i)); +} +} + +Map> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + +for (int i = 1; i < consumerCount; i++) { +String consumer = getConsumerName(i, consumerCount); +if (i % 4 == 0) { +subscriptions.put(consumer, buildSubscriptionV2Above(topics.subList(0, topicCount / 2), +assignment.get(consumer), generationId, i)); +} else { +subscriptions.put(consumer, buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), +assignment.get(consumer), generationId, i)); +} +} + +assignor.assignPartitions(partitionsPerTopic, subscriptions); +} + +@Test +public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() { +Map> partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, partitionInfos(topic, 6)); +partitionsPerTopic.put(topic1, partitionInfos(topic1, 1)); +int[][] sequence = new int[][]{{1, 2, 3}, {1, 3, 2}, {2, 1, 3}, {2, 3, 1}, {3, 1, 2}, {3, 2, 1}}; +for (int[] ints : sequence) { +subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic), partitions(tp(topic, 0), Review Comment: done, but seems this is enough? ``` subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic), partitions(tp(topic, 0), tp(topic, 2)), ints[0], 0)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1264792983 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -158,7 +158,7 @@ private boolean allSubscriptionsEqual(Set allTopics, // generation amongst for (final TopicPartition tp : memberData.partitions) { if (allTopics.contains(tp.topic())) { -String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +String otherConsumer = allPreviousPartitionsToOwner.get(tp); if (otherConsumer == null) { // this partition is not owned by other consumer in the same generation Review Comment: fixed, plz take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1262171529 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -194,6 +193,7 @@ private boolean allSubscriptionsEqual(Set allTopics, otherConsumer, otherMemberGeneration, tp, otherMemberGeneration); +allPreviousPartitionsToOwner.put(tp, otherConsumer); Review Comment: updated, plz check again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org