[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-26 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1274490680


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 500;
+int partitionCount = 2_00;
+int consumerCount = 2_0;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i % 4 == 0) {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(0, topicCount / 2), i));
+} else {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(topicCount / 2, 
topicCount), i));
+}
+}
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+for (int i = 1; i < consumerCount; i++) {
+String consumer = getConsumerName(i, consumerCount);
+if (i % 4 == 0) {
+subscriptions.put(
+consumer,
+buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+assignment.get(consumer), generationId, i)
+);
+} else {
+subscriptions.put(consumer,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-26 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1274483457


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)

Review Comment:
   Oh, this 90 should be sec, I set 90 sec because in my m1 air this test may 
run about 25 sec or more time.
   
   I tried ``TestUtils.waitForCondition`` but seems it is not effective and 
couldn't throw Exception when execution use time more than expected, I'm not 
familiar with this and couldn't find the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-25 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1274457186


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 500;
+int partitionCount = 2_00;
+int consumerCount = 2_0;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i % 4 == 0) {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(0, topicCount / 2), i));
+} else {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(topicCount / 2, 
topicCount), i));
+}
+}
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+for (int i = 1; i < consumerCount; i++) {
+String consumer = getConsumerName(i, consumerCount);
+if (i % 4 == 0) {
+subscriptions.put(
+consumer,
+buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+assignment.get(consumer), generationId, i)
+);
+} else {
+subscriptions.put(consumer,
+buildSubscriptionV2Above(topics.subList(topicCount / 
2, topicCount),
+assignment.get(consumer), generationId, i)
+);
+}
+}
+
+assignor.assignPartitions(partitionsPerTopic, subscriptions);
+}
+
+@Test
+public void 
testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() {

Review Comment:
   this method is to test ``allSubscriptionsEqual`` so its name is a mistake, 
thx!
   
   but this bug in ``allSubscriptionsEqual`` not lead to the specified 
partition that previously assigned to at least 3 generation consumers cannot be 
owned by the highest generation, but would lead this partition assign to more 
than one  consumers so  add to ``assignedPartitions`` more than once and also 
the function ``getUnassignedPartitions`` will get more partitions than 
actuality, as a result, many partitions already assigned to consumers will be 
added to ``unassignedPartitions``, all of these partitions would be assigned 
twice.
   
   I couldn't understand how to reuse such test style, could you provide any 
example?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-25 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1274341109


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -724,6 +725,90 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 500;
+int partitionCount = 2_00;
+int consumerCount = 2_0;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i % 4 == 0) {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(0, topicCount / 2), i));
+} else {
+subscriptions.put(getConsumerName(i, consumerCount),
+subscription(topics.subList(topicCount / 2, 
topicCount), i));
+}
+}
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+for (int i = 1; i < consumerCount; i++) {
+String consumer = getConsumerName(i, consumerCount);
+if (i % 4 == 0) {
+subscriptions.put(
+consumer,
+buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+assignment.get(consumer), generationId, i)
+);
+} else {
+subscriptions.put(consumer,
+buildSubscriptionV2Above(topics.subList(topicCount / 
2, topicCount),
+assignment.get(consumer), generationId, i)
+);
+}
+}
+
+assignor.assignPartitions(partitionsPerTopic, subscriptions);

Review Comment:
   for now ``GeneralAssignmentBuilder`` is the inner private class of 
``AbstractStickyAssignor``, test it directly need modify it to protected access 
level. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-17 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1266138582


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -724,6 +724,70 @@ public void 
testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 500;
+int partitionCount = 2_00;
+int consumerCount = 2_0;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i % 4 == 0) {
+subscriptions.put(getConsumerName(i, consumerCount), 
subscription(topics.subList(0, topicCount / 2),
+i));
+} else {
+subscriptions.put(getConsumerName(i, consumerCount), 
subscription(topics.subList(topicCount / 2,
+topicCount), i));
+}
+}
+
+Map> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+for (int i = 1; i < consumerCount; i++) {
+String consumer = getConsumerName(i, consumerCount);
+if (i % 4 == 0) {
+subscriptions.put(consumer, 
buildSubscriptionV2Above(topics.subList(0, topicCount / 2),
+assignment.get(consumer), generationId, i));
+} else {
+subscriptions.put(consumer, 
buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount),
+assignment.get(consumer), generationId, i));
+}
+}
+
+assignor.assignPartitions(partitionsPerTopic, subscriptions);
+}
+
+@Test
+public void 
testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() {
+Map> partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, partitionInfos(topic, 6));
+partitionsPerTopic.put(topic1, partitionInfos(topic1, 1));
+int[][] sequence = new int[][]{{1, 2, 3}, {1, 3, 2}, {2, 1, 3}, {2, 3, 
1}, {3, 1, 2}, {3, 2, 1}};
+for (int[] ints : sequence) {
+subscriptions.put(consumer1, 
buildSubscriptionV2Above(topics(topic), partitions(tp(topic, 0),

Review Comment:
   done, but seems this is enough?
   ```
   subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic),
   partitions(tp(topic, 0), tp(topic, 2)), ints[0], 0));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-16 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1264792983


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -158,7 +158,7 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // generation amongst
 for (final TopicPartition tp : memberData.partitions) {
 if (allTopics.contains(tp.topic())) {
-String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
 if (otherConsumer == null) {
 // this partition is not owned by other consumer in 
the same generation

Review Comment:
   fixed, plz take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-13 Thread via GitHub


flashmouse commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1262171529


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -194,6 +193,7 @@ private boolean allSubscriptionsEqual(Set allTopics,
 otherConsumer, otherMemberGeneration,
 tp,
 otherMemberGeneration);
+allPreviousPartitionsToOwner.put(tp, 
otherConsumer);

Review Comment:
   updated, plz check again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org