dajac commented on code in PR #20061: URL: https://github.com/apache/kafka/pull/20061#discussion_r2174556027
########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) such as topics, partitions, and their configurations. + * Implementations should be thread-safe and immutable. + */ +public interface CoordinatorMetadataImage { + CoordinatorMetadataImage EMPTY = emptyImage(); + + Optional<String> topicName(Uuid id); + + Optional<Uuid> topicId(String topicName); + + default Optional<Integer> partitionCount(Uuid topicId) { + var topicName = topicName(topicId); + return topicName.isEmpty() ? Optional.empty() : partitionCount(topicName.get()); + } + + Optional<Integer> partitionCount(String topicName); + + Set<Uuid> topicIds(); + + Set<String> topicNames(); + + default Optional<TopicMetadata> topicMetadata(String topicName) { + var topicId = topicId(topicName); + return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get()); + } + + Optional<TopicMetadata> topicMetadata(Uuid topicId); + + boolean shareGroupsEnabled(); + + CoordinatorMetadataDelta emptyDelta(); + + Long version(); + + boolean isEmpty(); + + /** + * Metadata about a particular topic + */ + interface TopicMetadata { + String name(); + + Uuid id(); + + int partitionCount(); + + default Set<Integer> partitionSet() { + return IntStream.range(0, partitionCount()).boxed().collect(Collectors.toSet()); + } Review Comment: This is annoying. Let me check where we still use this. ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) such as topics, partitions, and their configurations. + * Implementations should be thread-safe and immutable. + */ +public interface CoordinatorMetadataImage { + CoordinatorMetadataImage EMPTY = emptyImage(); + + Optional<String> topicName(Uuid id); + + Optional<Uuid> topicId(String topicName); + + default Optional<Integer> partitionCount(Uuid topicId) { + var topicName = topicName(topicId); + return topicName.isEmpty() ? Optional.empty() : partitionCount(topicName.get()); + } + + Optional<Integer> partitionCount(String topicName); + + Set<Uuid> topicIds(); + + Set<String> topicNames(); + + default Optional<TopicMetadata> topicMetadata(String topicName) { + var topicId = topicId(topicName); + return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get()); + } + + Optional<TopicMetadata> topicMetadata(Uuid topicId); + + boolean shareGroupsEnabled(); + + CoordinatorMetadataDelta emptyDelta(); + + Long version(); Review Comment: nit: Could we use `long` here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1921,14 +1921,14 @@ private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare errorTopicResponses.add( new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() .setTopicId(topicData.topicId()) - .setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId())) + .setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")) Review Comment: We cannot return `<UNKNOWN>`. What was returned with the previous implementation? `null`? ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) such as topics, partitions, and their configurations. + * Implementations should be thread-safe and immutable. + */ +public interface CoordinatorMetadataImage { + CoordinatorMetadataImage EMPTY = emptyImage(); + + Optional<String> topicName(Uuid id); + + Optional<Uuid> topicId(String topicName); + + default Optional<Integer> partitionCount(Uuid topicId) { + var topicName = topicName(topicId); + return topicName.isEmpty() ? Optional.empty() : partitionCount(topicName.get()); + } + + Optional<Integer> partitionCount(String topicName); + + Set<Uuid> topicIds(); + + Set<String> topicNames(); + + default Optional<TopicMetadata> topicMetadata(String topicName) { + var topicId = topicId(topicName); + return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get()); + } Review Comment: nit: Should we provide the default implementation for the topic id version in order to be consistent with `partitionCount`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) { */ @Override public Set<String> racksForPartition(Uuid topicId, int partition) { - TopicImage topic = metadataImage.topics().getTopic(topicId); - if (topic != null) { - PartitionRegistration partitionRegistration = topic.partitions().get(partition); - if (partitionRegistration != null) { - Set<String> racks = new HashSet<>(); - for (int replica : partitionRegistration.replicas) { - // Only add the rack if it is available for the broker/replica. - BrokerRegistration brokerRegistration = metadataImage.cluster().broker(replica); - if (brokerRegistration != null) { - brokerRegistration.rack().ifPresent(racks::add); - } - } - return Collections.unmodifiableSet(racks); - } + Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = metadataImage.topicMetadata(topicId); + if (topicMetadataOp.isEmpty()) { + return Collections.emptySet(); + } + + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicMetadataOp.get(); + List<String> racks = topicMetadata.partitionRacks().get(partition); + if (racks == null) { + return Collections.emptySet(); Review Comment: nit: Set.of(). ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) such as topics, partitions, and their configurations. + * Implementations should be thread-safe and immutable. + */ +public interface CoordinatorMetadataImage { + CoordinatorMetadataImage EMPTY = emptyImage(); + + Optional<String> topicName(Uuid id); + + Optional<Uuid> topicId(String topicName); + + default Optional<Integer> partitionCount(Uuid topicId) { + var topicName = topicName(topicId); + return topicName.isEmpty() ? Optional.empty() : partitionCount(topicName.get()); + } + + Optional<Integer> partitionCount(String topicName); + + Set<Uuid> topicIds(); + + Set<String> topicNames(); + + default Optional<TopicMetadata> topicMetadata(String topicName) { + var topicId = topicId(topicName); + return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get()); + } + + Optional<TopicMetadata> topicMetadata(Uuid topicId); + + boolean shareGroupsEnabled(); Review Comment: This feels weird in the interface. I wonder whether we could expose the features instead. It seems useful for other needs too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1921,14 +1921,14 @@ private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare errorTopicResponses.add( new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() .setTopicId(topicData.topicId()) - .setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId())) + .setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")) .setErrorMessage(Errors.forCode(errItem.get().errorCode()).message()) .setErrorCode(errItem.get().errorCode()) ); } else { successTopics.put( topicData.topicId(), - metadataImage.topics().topicIdToNameView().get(topicData.topicId()) + metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>") Review Comment: ditto. In this case, we can return ZERO_UUID. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -2139,7 +2139,7 @@ public void onPartitionsDeleted( // At this point the metadata will not have been updated // with the deleted topics. Set<Uuid> topicIds = topicPartitions.stream() - .map(tp -> metadataImage.topics().getTopic(tp.topic()).id()) + .map(tp -> metadataImage.topicId(tp.topic()).orElse(null)) Review Comment: ZERO_UUID? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) { */ @Override public Set<String> racksForPartition(Uuid topicId, int partition) { - TopicImage topic = metadataImage.topics().getTopic(topicId); - if (topic != null) { - PartitionRegistration partitionRegistration = topic.partitions().get(partition); - if (partitionRegistration != null) { - Set<String> racks = new HashSet<>(); - for (int replica : partitionRegistration.replicas) { - // Only add the rack if it is available for the broker/replica. - BrokerRegistration brokerRegistration = metadataImage.cluster().broker(replica); - if (brokerRegistration != null) { - brokerRegistration.rack().ifPresent(racks::add); - } - } - return Collections.unmodifiableSet(racks); - } + Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = metadataImage.topicMetadata(topicId); + if (topicMetadataOp.isEmpty()) { + return Collections.emptySet(); + } + + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicMetadataOp.get(); + List<String> racks = topicMetadata.partitionRacks().get(partition); Review Comment: This call is pretty nasty because it generate the entire map just to return the racks of a single partition and it does so every time racksForPartition is called. We should definitely consider my suggestion for the interface. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -5725,24 +5717,23 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { // the subscription metadata (and the assignment) via the above mechanism. The // resolved regular expressions are cleaned up on the next refresh. if (!topicsDelta.createdTopicIds().isEmpty()) { - lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset(); + lastMetadataImageWithNewTopics = metadataImage.version(); } // Notify all the groups subscribed to the created, updated or // deleted topics. Set<String> allGroupIds = new HashSet<>(); - topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { - String topicName = topicDelta.name(); - // Remove topic hash from the cache to recalculate it. - topicHashCache.remove(topicName); - allGroupIds.addAll(groupsSubscribedToTopic(topicName)); - }); - topicsDelta.deletedTopicIds().forEach(topicId -> { - TopicImage topicImage = delta.image().topics().getTopic(topicId); - String topicName = topicImage.name(); - topicHashCache.remove(topicName); - allGroupIds.addAll(groupsSubscribedToTopic(topicName)); - }); + topicsDelta.changedTopicIds().forEach(topicId -> + metadataImage.topicName(topicId).ifPresent(topicName -> { + // Remove topic hash from the cache to recalculate it. + topicHashCache.remove(topicName); + allGroupIds.addAll(groupsSubscribedToTopic(topicName)); + })); + topicsDelta.deletedTopicIds().forEach(topicId -> + delta.image().topicName(topicId).ifPresent(topicName -> { Review Comment: Why do you use `delta.image()` here and `metadataImage` above? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> topicHashes) { * 5. For each partition, write the partition ID and a sorted list of rack identifiers. * - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators. * - * @param topicName The topic image. - * @param metadataImage The cluster image. + * @param topicName The topic name. + * @param metadataImage The topic metadata. * @return The hash of the topic. */ - public static long computeTopicHash(String topicName, MetadataImage metadataImage) { - TopicImage topicImage = metadataImage.topics().getTopic(topicName); - if (topicImage == null) { + public static long computeTopicHash(String topicName, CoordinatorMetadataImage metadataImage) { + Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = metadataImage.topicMetadata(topicName); + if (topicImage.isEmpty()) { return 0; } + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicImage.get(); + HashStream64 hasher = Hashing.xxh3_64().hashStream(); hasher = hasher .putByte(TOPIC_HASH_MAGIC_BYTE) - .putLong(topicImage.id().getMostSignificantBits()) - .putLong(topicImage.id().getLeastSignificantBits()) - .putString(topicImage.name()) - .putInt(topicImage.partitions().size()); - - ClusterImage clusterImage = metadataImage.cluster(); - List<String> racks = new ArrayList<>(); - for (int i = 0; i < topicImage.partitions().size(); i++) { + .putLong(topicMetadata.id().getMostSignificantBits()) + .putLong(topicMetadata.id().getLeastSignificantBits()) + .putString(topicMetadata.name()) + .putInt(topicMetadata.partitionCount()); + + Map<Integer, List<String>> racks = topicMetadata.partitionRacks(); + for (int i = 0; i < topicMetadata.partitionCount(); i++) { hasher = hasher.putInt(i); - racks.clear(); // Clear the list for reuse - for (int replicaId : topicImage.partitions().get(i).replicas) { - BrokerRegistration broker = clusterImage.broker(replicaId); - if (broker != null) { - broker.rack().ifPresent(racks::add); - } - } + List<String> partitionRacks = new ArrayList<>(racks.get(i)); + // topicMetadata returns an unmodifiable list + Collections.copy(partitionRacks, racks.get(i)); Review Comment: This is a tad annoying too because we already allocate a new ArrayList when partitionRacks is called. For performance reasons, I wonder if we should not return an unmodifiable list so we can reuse it here. This part is performance sensitive. We have a jmh benchmark that we may want to run to ensure that we don't regress here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -477,7 +482,7 @@ public void testMemberJoinsEmptyConsumerGroup() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(metadataImage) + .withMetadataImage(new KRaftCoordinatorMetadataImage(metadataImage)) Review Comment: I am debating whether we should update MetadataImageBuilder to return a CoordinatorMetadataImage instead of wrapping it everywhere. Have you considered it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> topicHashes) { * 5. For each partition, write the partition ID and a sorted list of rack identifiers. * - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators. * - * @param topicName The topic image. - * @param metadataImage The cluster image. + * @param topicName The topic name. + * @param metadataImage The topic metadata. * @return The hash of the topic. */ - public static long computeTopicHash(String topicName, MetadataImage metadataImage) { - TopicImage topicImage = metadataImage.topics().getTopic(topicName); - if (topicImage == null) { + public static long computeTopicHash(String topicName, CoordinatorMetadataImage metadataImage) { + Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = metadataImage.topicMetadata(topicName); + if (topicImage.isEmpty()) { return 0; } + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicImage.get(); + HashStream64 hasher = Hashing.xxh3_64().hashStream(); hasher = hasher .putByte(TOPIC_HASH_MAGIC_BYTE) - .putLong(topicImage.id().getMostSignificantBits()) - .putLong(topicImage.id().getLeastSignificantBits()) - .putString(topicImage.name()) - .putInt(topicImage.partitions().size()); - - ClusterImage clusterImage = metadataImage.cluster(); - List<String> racks = new ArrayList<>(); - for (int i = 0; i < topicImage.partitions().size(); i++) { + .putLong(topicMetadata.id().getMostSignificantBits()) + .putLong(topicMetadata.id().getLeastSignificantBits()) + .putString(topicMetadata.name()) + .putInt(topicMetadata.partitionCount()); + + Map<Integer, List<String>> racks = topicMetadata.partitionRacks(); Review Comment: I don't like this because generating `Map<Integer, List<String>>` is not necessary. Could we change the interface to take the partition index as an argument and return the rack for that partition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org