dajac commented on code in PR #20061:
URL: https://github.com/apache/kafka/pull/20061#discussion_r2174556027


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) 
such as topics, partitions, and their configurations.
+ * Implementations should be thread-safe and immutable.
+ */
+public interface CoordinatorMetadataImage {
+    CoordinatorMetadataImage EMPTY = emptyImage();
+
+    Optional<String> topicName(Uuid id);
+
+    Optional<Uuid> topicId(String topicName);
+
+    default Optional<Integer> partitionCount(Uuid topicId) {
+        var topicName = topicName(topicId);
+        return topicName.isEmpty() ? Optional.empty() : 
partitionCount(topicName.get());
+    }
+
+    Optional<Integer> partitionCount(String topicName);
+
+    Set<Uuid> topicIds();
+
+    Set<String> topicNames();
+
+    default Optional<TopicMetadata> topicMetadata(String topicName) {
+        var topicId = topicId(topicName);
+        return topicId.isEmpty() ? Optional.empty() : 
topicMetadata(topicId.get());
+    }
+
+    Optional<TopicMetadata> topicMetadata(Uuid topicId);
+
+    boolean shareGroupsEnabled();
+
+    CoordinatorMetadataDelta emptyDelta();
+
+    Long version();
+
+    boolean isEmpty();
+
+    /**
+     * Metadata about a particular topic
+     */
+    interface TopicMetadata {
+        String name();
+
+        Uuid id();
+
+        int partitionCount();
+
+        default Set<Integer> partitionSet() {
+            return IntStream.range(0, 
partitionCount()).boxed().collect(Collectors.toSet());
+        }

Review Comment:
   This is annoying. Let me check where we still use this.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) 
such as topics, partitions, and their configurations.
+ * Implementations should be thread-safe and immutable.
+ */
+public interface CoordinatorMetadataImage {
+    CoordinatorMetadataImage EMPTY = emptyImage();
+
+    Optional<String> topicName(Uuid id);
+
+    Optional<Uuid> topicId(String topicName);
+
+    default Optional<Integer> partitionCount(Uuid topicId) {
+        var topicName = topicName(topicId);
+        return topicName.isEmpty() ? Optional.empty() : 
partitionCount(topicName.get());
+    }
+
+    Optional<Integer> partitionCount(String topicName);
+
+    Set<Uuid> topicIds();
+
+    Set<String> topicNames();
+
+    default Optional<TopicMetadata> topicMetadata(String topicName) {
+        var topicId = topicId(topicName);
+        return topicId.isEmpty() ? Optional.empty() : 
topicMetadata(topicId.get());
+    }
+
+    Optional<TopicMetadata> topicMetadata(Uuid topicId);
+
+    boolean shareGroupsEnabled();
+
+    CoordinatorMetadataDelta emptyDelta();
+
+    Long version();

Review Comment:
   nit: Could we use `long` here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1921,14 +1921,14 @@ private 
CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare
                 errorTopicResponses.add(
                     new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
                         .setTopicId(topicData.topicId())
-                        
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+                        
.setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>"))

Review Comment:
   We cannot return `<UNKNOWN>`. What was returned with the previous 
implementation? `null`?



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) 
such as topics, partitions, and their configurations.
+ * Implementations should be thread-safe and immutable.
+ */
+public interface CoordinatorMetadataImage {
+    CoordinatorMetadataImage EMPTY = emptyImage();
+
+    Optional<String> topicName(Uuid id);
+
+    Optional<Uuid> topicId(String topicName);
+
+    default Optional<Integer> partitionCount(Uuid topicId) {
+        var topicName = topicName(topicId);
+        return topicName.isEmpty() ? Optional.empty() : 
partitionCount(topicName.get());
+    }
+
+    Optional<Integer> partitionCount(String topicName);
+
+    Set<Uuid> topicIds();
+
+    Set<String> topicNames();
+
+    default Optional<TopicMetadata> topicMetadata(String topicName) {
+        var topicId = topicId(topicName);
+        return topicId.isEmpty() ? Optional.empty() : 
topicMetadata(topicId.get());
+    }

Review Comment:
   nit: Should we provide the default implementation for the topic id version 
in order to be consistent with `partitionCount`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
-        TopicImage topic = metadataImage.topics().getTopic(topicId);
-        if (topic != null) {
-            PartitionRegistration partitionRegistration = 
topic.partitions().get(partition);
-            if (partitionRegistration != null) {
-                Set<String> racks = new HashSet<>();
-                for (int replica : partitionRegistration.replicas) {
-                    // Only add the rack if it is available for the 
broker/replica.
-                    BrokerRegistration brokerRegistration = 
metadataImage.cluster().broker(replica);
-                    if (brokerRegistration != null) {
-                        brokerRegistration.rack().ifPresent(racks::add);
-                    }
-                }
-                return Collections.unmodifiableSet(racks);
-            }
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = 
metadataImage.topicMetadata(topicId);
+        if (topicMetadataOp.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicMetadataOp.get();
+        List<String> racks = topicMetadata.partitionRacks().get(partition);
+        if (racks == null) {
+            return Collections.emptySet();

Review Comment:
   nit: Set.of().



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) 
such as topics, partitions, and their configurations.
+ * Implementations should be thread-safe and immutable.
+ */
+public interface CoordinatorMetadataImage {
+    CoordinatorMetadataImage EMPTY = emptyImage();
+
+    Optional<String> topicName(Uuid id);
+
+    Optional<Uuid> topicId(String topicName);
+
+    default Optional<Integer> partitionCount(Uuid topicId) {
+        var topicName = topicName(topicId);
+        return topicName.isEmpty() ? Optional.empty() : 
partitionCount(topicName.get());
+    }
+
+    Optional<Integer> partitionCount(String topicName);
+
+    Set<Uuid> topicIds();
+
+    Set<String> topicNames();
+
+    default Optional<TopicMetadata> topicMetadata(String topicName) {
+        var topicId = topicId(topicName);
+        return topicId.isEmpty() ? Optional.empty() : 
topicMetadata(topicId.get());
+    }
+
+    Optional<TopicMetadata> topicMetadata(Uuid topicId);
+
+    boolean shareGroupsEnabled();

Review Comment:
   This feels weird in the interface. I wonder whether we could expose the 
features instead. It seems useful for other needs too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1921,14 +1921,14 @@ private 
CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare
                 errorTopicResponses.add(
                     new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
                         .setTopicId(topicData.topicId())
-                        
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+                        
.setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>"))
                         
.setErrorMessage(Errors.forCode(errItem.get().errorCode()).message())
                         .setErrorCode(errItem.get().errorCode())
                 );
             } else {
                 successTopics.put(
                     topicData.topicId(),
-                    
metadataImage.topics().topicIdToNameView().get(topicData.topicId())
+                    
metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")

Review Comment:
   ditto. In this case, we can return ZERO_UUID.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -2139,7 +2139,7 @@ public void onPartitionsDeleted(
         // At this point the metadata will not have been updated
         // with the deleted topics.
         Set<Uuid> topicIds = topicPartitions.stream()
-            .map(tp -> metadataImage.topics().getTopic(tp.topic()).id())
+            .map(tp -> metadataImage.topicId(tp.topic()).orElse(null))

Review Comment:
   ZERO_UUID?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
-        TopicImage topic = metadataImage.topics().getTopic(topicId);
-        if (topic != null) {
-            PartitionRegistration partitionRegistration = 
topic.partitions().get(partition);
-            if (partitionRegistration != null) {
-                Set<String> racks = new HashSet<>();
-                for (int replica : partitionRegistration.replicas) {
-                    // Only add the rack if it is available for the 
broker/replica.
-                    BrokerRegistration brokerRegistration = 
metadataImage.cluster().broker(replica);
-                    if (brokerRegistration != null) {
-                        brokerRegistration.rack().ifPresent(racks::add);
-                    }
-                }
-                return Collections.unmodifiableSet(racks);
-            }
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = 
metadataImage.topicMetadata(topicId);
+        if (topicMetadataOp.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicMetadataOp.get();
+        List<String> racks = topicMetadata.partitionRacks().get(partition);

Review Comment:
   This call is pretty nasty because it generate the entire map just to return 
the racks of a single partition and it does so every time racksForPartition is 
called. We should definitely consider my suggestion for the interface.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -5725,24 +5717,23 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
         // the subscription metadata (and the assignment) via the above 
mechanism. The
         // resolved regular expressions are cleaned up on the next refresh.
         if (!topicsDelta.createdTopicIds().isEmpty()) {
-            lastMetadataImageWithNewTopics = 
metadataImage.provenance().lastContainedOffset();
+            lastMetadataImageWithNewTopics = metadataImage.version();
         }
 
         // Notify all the groups subscribed to the created, updated or
         // deleted topics.
         Set<String> allGroupIds = new HashSet<>();
-        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
-            String topicName = topicDelta.name();
-            // Remove topic hash from the cache to recalculate it.
-            topicHashCache.remove(topicName);
-            allGroupIds.addAll(groupsSubscribedToTopic(topicName));
-        });
-        topicsDelta.deletedTopicIds().forEach(topicId -> {
-            TopicImage topicImage = delta.image().topics().getTopic(topicId);
-            String topicName = topicImage.name();
-            topicHashCache.remove(topicName);
-            allGroupIds.addAll(groupsSubscribedToTopic(topicName));
-        });
+        topicsDelta.changedTopicIds().forEach(topicId ->
+            metadataImage.topicName(topicId).ifPresent(topicName -> {
+                // Remove topic hash from the cache to recalculate it.
+                topicHashCache.remove(topicName);
+                allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+            }));
+        topicsDelta.deletedTopicIds().forEach(topicId ->
+            delta.image().topicName(topicId).ifPresent(topicName -> {

Review Comment:
   Why do you use `delta.image()` here and `metadataImage` above?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> 
topicHashes) {
      * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
      * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
      *
-     * @param topicName     The topic image.
-     * @param metadataImage The cluster image.
+     * @param topicName The topic name.
+     * @param metadataImage The topic metadata.
      * @return The hash of the topic.
      */
-    public static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
-        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
-        if (topicImage == null) {
+    public static long computeTopicHash(String topicName, 
CoordinatorMetadataImage metadataImage) {
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = 
metadataImage.topicMetadata(topicName);
+        if (topicImage.isEmpty()) {
             return 0;
         }
 
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicImage.get();
+
         HashStream64 hasher = Hashing.xxh3_64().hashStream();
         hasher = hasher
             .putByte(TOPIC_HASH_MAGIC_BYTE)
-            .putLong(topicImage.id().getMostSignificantBits())
-            .putLong(topicImage.id().getLeastSignificantBits())
-            .putString(topicImage.name())
-            .putInt(topicImage.partitions().size());
-
-        ClusterImage clusterImage = metadataImage.cluster();
-        List<String> racks = new ArrayList<>();
-        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            .putLong(topicMetadata.id().getMostSignificantBits())
+            .putLong(topicMetadata.id().getLeastSignificantBits())
+            .putString(topicMetadata.name())
+            .putInt(topicMetadata.partitionCount());
+
+        Map<Integer, List<String>> racks = topicMetadata.partitionRacks();
+        for (int i = 0; i < topicMetadata.partitionCount(); i++) {
             hasher = hasher.putInt(i);
-            racks.clear(); // Clear the list for reuse
-            for (int replicaId : topicImage.partitions().get(i).replicas) {
-                BrokerRegistration broker = clusterImage.broker(replicaId);
-                if (broker != null) {
-                    broker.rack().ifPresent(racks::add);
-                }
-            }
+            List<String> partitionRacks = new ArrayList<>(racks.get(i));
+            // topicMetadata returns an unmodifiable list
+            Collections.copy(partitionRacks, racks.get(i));

Review Comment:
   This is a tad annoying too because we already allocate a new ArrayList when 
partitionRacks is called. For performance reasons, I wonder if we should not 
return an unmodifiable list so we can reuse it here.
   
   This part is performance sensitive. We have a jmh benchmark that we may want 
to run to ensure that we don't regress here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -477,7 +482,7 @@ public void testMemberJoinsEmptyConsumerGroup() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
-            .withMetadataImage(metadataImage)
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))

Review Comment:
   I am debating whether we should update MetadataImageBuilder to return a 
CoordinatorMetadataImage instead of wrapping it everywhere. Have you considered 
it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> 
topicHashes) {
      * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
      * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
      *
-     * @param topicName     The topic image.
-     * @param metadataImage The cluster image.
+     * @param topicName The topic name.
+     * @param metadataImage The topic metadata.
      * @return The hash of the topic.
      */
-    public static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
-        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
-        if (topicImage == null) {
+    public static long computeTopicHash(String topicName, 
CoordinatorMetadataImage metadataImage) {
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = 
metadataImage.topicMetadata(topicName);
+        if (topicImage.isEmpty()) {
             return 0;
         }
 
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicImage.get();
+
         HashStream64 hasher = Hashing.xxh3_64().hashStream();
         hasher = hasher
             .putByte(TOPIC_HASH_MAGIC_BYTE)
-            .putLong(topicImage.id().getMostSignificantBits())
-            .putLong(topicImage.id().getLeastSignificantBits())
-            .putString(topicImage.name())
-            .putInt(topicImage.partitions().size());
-
-        ClusterImage clusterImage = metadataImage.cluster();
-        List<String> racks = new ArrayList<>();
-        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            .putLong(topicMetadata.id().getMostSignificantBits())
+            .putLong(topicMetadata.id().getLeastSignificantBits())
+            .putString(topicMetadata.name())
+            .putInt(topicMetadata.partitionCount());
+
+        Map<Integer, List<String>> racks = topicMetadata.partitionRacks();

Review Comment:
   I don't like this because generating `Map<Integer, List<String>>` is not 
necessary. Could we change the interface to take the partition index as an 
argument and return the rack for that partition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to