cadonna commented on code in PR #18676:
URL: https://github.com/apache/kafka/pull/18676#discussion_r1937549560


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.stream.Stream;
+
+/**
+ * The topology metadata class is used by the {@link 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic 
and
+ * partition metadata for the topology that the streams group using.
+ *
+ * @param topicMetadata The topic Ids mapped to their corresponding {@link 
TopicMetadata} object, which contains topic and partition
+ *                      metadata.
+ * @param topology      The configured topology, containing subtopologies and 
internal topics.
+ */
+public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, 
ConfiguredTopology topology) implements TopologyDescriber {
+
+    public TopologyMetadata {
+        topicMetadata = 
Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+        Objects.requireNonNull(topology);
+    }
+
+    /**
+     * Map of topic names to topic metadata.
+     *
+     * @return The map of topic Ids to topic metadata.
+     */
+    @Override
+    public Map<String, TopicMetadata> topicMetadata() {
+        return this.topicMetadata;
+    }
+
+    /**
+     * Checks whether the given subtopology is associated with a changelog 
topic.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     * @throws IllegalStateException if the subtopology ID does not exist, or 
the topology is not configured.
+     * @return true if the subtopology is associated with a changelog topic, 
false otherwise.
+     */
+    @Override
+    public boolean isStateful(String subtopologyId) {
+        final ConfiguredSubtopology subtopology = 
getSubtopologyOrFail(subtopologyId);
+        return !subtopology.stateChangelogTopics().isEmpty();
+    }
+
+    /**
+     * The list of subtopologies in the topology.
+     *
+     * @throws IllegalStateException if the topology is not configured.
+     * @return a list of subtopology IDs.
+     */
+    @Override
+    public List<String> subtopologies() {
+        return getSubtopologiesOrFail().keySet().stream().toList();
+    }
+
+    /**
+     * The maximal number of input partitions among all source topics for the 
given subtopology.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     *
+     * @throws IllegalStateException if the subtopology ID does not exist, or 
the topology is not configured, or the subtopology
+     *                               contains no source topics.
+     * @return The maximal number of input partitions among all source topics 
for the given subtopology.
+     */
+    @Override
+    public int maxNumInputPartitions(String subtopologyId) {
+        final ConfiguredSubtopology subtopology = 
getSubtopologyOrFail(subtopologyId);
+        return Stream.concat(
+            subtopology.sourceTopics().stream(),
+            subtopology.repartitionSourceTopics().keySet().stream()
+        ).map(topic -> 
this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+            () -> new IllegalStateException("Subtopology does not contain any 
source topics")
+        );
+    }
+
+    private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
getSubtopologiesOrFail();
+        if (!subtopologies.containsKey(subtopologyId)) {
+            throw new IllegalStateException(String.format("Topology does not 
contain subtopology %s", subtopologyId));
+        }
+        return subtopologies.get(subtopologyId);
+    }
+
+    private Map<String, ConfiguredSubtopology> getSubtopologiesOrFail() {
+        final Optional<SortedMap<String, ConfiguredSubtopology>> subtopologies 
= topology.subtopologies();
+        if (subtopologies.isEmpty()) {
+            throw new IllegalStateException("Topology is not configured");
+        }

Review Comment:
   Is there a reason, you cannot make this check at construction time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to