[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276742165 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: I see. I wonder if we should actually move `recordEquals` to `RecordHelpersTest`. The dependency would make more sense this way, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276739854 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: The javadoc is definitely a better place than this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276677238 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -440,12 +442,13 @@ public void testUpdateSubscriptionMetadata() { // It should return foo now. assertEquals( mkMap( -mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) +mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, Collections.emptyMap())) Review Comment: Should we update `testUpdateSubscriptionMetadata` to include racks for some topics? Otherwise, it seems that we don't really test the newly added logic into `computeSubscriptionMetadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276671418 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -84,14 +85,15 @@ public void addGroupMember( public Uuid addTopicMetadata( String topicName, -int numPartitions +int numPartitions, +Map> partitionRacks ) { Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, -numPartitions -)); +numPartitions, +partitionRacks)); Review Comment: nit: Let bring back the closing parenthesis as they were. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670928 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SubscribedTopicMetadataTest { + +@Test +public void testAttribute() { +Map topicMetadataMap = new HashMap<>(); +for (int i = 0; i < 5; i++) { +Uuid topicId = Uuid.randomUuid(); +String topicName = "topic" + i; +Map> partitionRacks = mkMapOfPartitionRacks(5); +topicMetadataMap.put( +topicId, +new TopicMetadata(topicId, topicName, 5, partitionRacks) +); +} +assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata()); +} + +@Test +public void testTopicMetadataCannotBeNull() { +assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null)); +} + +@Test +public void testEquals() { +Map topicMetadataMap = new HashMap<>(); +for (int i = 0; i < 5; i++) { +Uuid topicId = Uuid.randomUuid(); +String topicName = "topic" + i; +Map> partitionRacks = mkMapOfPartitionRacks(5); +topicMetadataMap.put( +topicId, +new TopicMetadata(topicId, topicName, 5, partitionRacks) +); +} +SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); +assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata); + +Map topicMetadataMap2 = new HashMap<>(); +Uuid topicId = Uuid.randomUuid(); +topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); +assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata); +} +} Review Comment: nit: Let's add a new line at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670286 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -571,4 +716,14 @@ private void assertAssignment(Map>> expectedAssig assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } + +// When rack awareness is enabled for this assignor, rack information can be updated in this method. +private Map> createPartitionMetadata(int numPartitions) { Review Comment: nit: static? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276669652 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: Should we remove this? It does not make sense here. Or was it supposed to be somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276668936 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -612,15 +616,14 @@ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() { MetadataVersion.IBP_3_5_IV2 )); } - Review Comment: It is not really about the empty line bit more about the unnecessary spaces on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r127851 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + +List expectedTopicMetadataList = +expectedValue.topics(); +List actualTopicMetadataList = +actualValue.topics(); + +if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { +fail("Topic metadata lists have different sizes"); +} + +for (int i = 0; i < expectedValue.topics().size(); i++) { +ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = +expectedTopicMetadataList.get(i); +ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = +actualTopicMetadataList.get(i); + +assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); +assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); +assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + +List expectedPartitionMetadataList = +expectedTopicMetadata.partitionMetadata(); +List actualPartitionMetadataList = +actualTopicMetadata.partitionMetadata(); + +// If the list is empty, rack information wasn't available for any replica of +// the partition and hence, the entry wasn't added to the record. +if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { +fail("Partition metadata lists have different sizes"); +} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { +for (int j = 0; j < expectedTopicMetadata.numPartitions(); j++) { +ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata = +expectedPartitionMetadataList.get(j); +ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata = +actualPartitionMetadataList.get(j); + +assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition()); + assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks()); +} +} else { +assertEquals(expected.message(), actual.message()); Review Comment: Why are we doing this? If the partition metadata are empty, we could just run the previous branch as well. It would be a no-op. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276663083 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + +List expectedTopicMetadataList = +expectedValue.topics(); +List actualTopicMetadataList = +actualValue.topics(); + +if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { +fail("Topic metadata lists have different sizes"); +} + +for (int i = 0; i < expectedValue.topics().size(); i++) { +ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = +expectedTopicMetadataList.get(i); +ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = +actualTopicMetadataList.get(i); + +assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); +assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); +assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + +List expectedPartitionMetadataList = +expectedTopicMetadata.partitionMetadata(); +List actualPartitionMetadataList = +actualTopicMetadata.partitionMetadata(); + +// If the list is empty, rack information wasn't available for any replica of +// the partition and hence, the entry wasn't added to the record. +if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { +fail("Partition metadata lists have different sizes"); +} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { +for (int j = 0; j < expectedTopicMetadata.numPartitions(); j++) { Review Comment: Using numPartitions seems incorrect here as it could be larger than the number of partition metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + Review Comment: Should we assert numPartitions as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + Review Comment: Should we assert numPartitions as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276653838 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: Is there a reason for making it public? I have the same question for the following two methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1275243699 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord( Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); -newSubscriptionMetadata.forEach((topicName, topicMetadata) -> +newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { +List partitionMetadata = new ArrayList<>(); +if (!topicMetadata.partitionRacks().isEmpty()) { +topicMetadata.partitionRacks().forEach((partition, racks) -> { +partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() +.setPartition(partition) +.setRacks(new ArrayList<>(racks)) +); +}); +} +// If partition rack information is empty, store an empty list in the record. value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) -) -); +.setPartitionMetadata(partitionMetadata.isEmpty() ? Collections.emptyList() : partitionMetadata) Review Comment: Using `Collections.emptyList` makes sense when you can avoid allocating an empty `ArrayList`. In this case, the array is already allocated so we could just use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1275242714 ## checkstyle/suppressions.xml: ## @@ -321,13 +321,13 @@ + - Review Comment: Hum.. I asked in the other PR if the change was necessary: https://github.com/apache/kafka/pull/13998/files#r1272466045. You said that you removed it but it is still here. I am confused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1274915017 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + Review Comment: Should we add a test which verify that partitions with no racks are not put into the record? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord( Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); -newSubscriptionMetadata.forEach((topicName, topicMetadata) -> +newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { +List partitionMetadata = new ArrayList<>(); +if (!topicMetadata.partitionRacks().isEmpty()) { +topicMetadata.partitionRacks().forEach((partition, racks) -> { +partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() +.setPartition(partition) +.setRacks(new ArrayList<>(racks)) +); +}); +} +// If partition rack information is empty, store an empty list in the record. Review Comment: nit: Would it make sense to put this comment before the `if` statement? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -612,15 +616,14 @@ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() { MetadataVersion.IBP_3_5_IV2 )); } - Review Comment: nit: Let's revert this. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord( Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); -newSubscriptionMetadata.forEach((topicName, topicMetadata) -> +newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { +List partitionMetadata = new ArrayList<>(); +if (!topicMetadata.partitionRacks().isEmpty()) { +topicMetadata.partitionRacks().forEach((partition, racks) -> { +partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() +.setPartition(partition) +.setRacks(new ArrayList<>(racks)) +); +}); +} +// If partition rack information is empty, store an empty list in the record. value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) -) -); +.setPartitionMetadata(partitionMetadata.isEmpty() ? Collections.emptyList() : partitionMetadata) Review Comment: nit: Why can't we use `partitionMetadata` if it is empty? That seems just fine. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ## @@ -26,18 +26,17 @@ */ @InterfaceStability.Unstable public interface PartitionAssignor { - /** * Unique name for this assignor. */ String name(); /** - * Perform the group assignment given the current members and - * topic metadata. + * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentSpec The assignment spec which included member metadata. Review Comment: nit: `includes`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1274909957 ## checkstyle/suppressions.xml: ## @@ -321,13 +321,13 @@ + - Review Comment: Why are we moving this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org