ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196246



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a 
bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+            log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+            throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+                                            "must subscribe to at least one 
source topic or global table.");
+        }
+
+        // Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+        if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+            return 0;
+        }
+
+        return configuredNumStreamThreads;
+    }
+
+    public boolean hasNamedTopologies() {
+        // This includes the case of starting up with no named topologies at 
all
+        return !builders.containsKey(UNNAMED_TOPOLOGY);
+    }
+
+    public boolean hasGlobalTopology() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+    }
+
+    public boolean hasNoNonGlobalTopology() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+    }
+
+    public boolean hasPersistentStores() {
+        // If the app is using named topologies, there may not be any 
persistent state when it first starts up
+        // but a new NamedTopology may introduce it later, so we must return 
true
+        if (hasNamedTopologies()) {
+            return true;
+        }
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasPersistentStores);
+    }
+
+    public boolean hasStore(final String name) {
+        return evaluateConditionIsTrueForAnyBuilders(b -> b.hasStore(name));
+    }
+
+    public boolean hasOffsetResetOverrides() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
+    }
+
+    public OffsetResetStrategy offsetResetStrategy(final String topic) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final OffsetResetStrategy resetStrategy = 
builder.offsetResetStrategy(topic);
+            if (resetStrategy != null) {
+                return resetStrategy;
+            }
+        }
+        return null;
+    }
+
+    Collection<String> sourceTopicCollection() {
+        final List<String> sourceTopics = new ArrayList<>();
+        applyToEachBuilder(b -> 
sourceTopics.addAll(b.sourceTopicCollection()));
+        return sourceTopics;
+    }
+
+    Pattern sourceTopicPattern() {
+        final StringBuilder patternBuilder = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            final String patternString = b.sourceTopicsPatternString();
+            if (patternString.length() > 0) {
+                patternBuilder.append(patternString).append("|");
+            }
+        });
+
+        if (patternBuilder.length() > 0) {
+            patternBuilder.setLength(patternBuilder.length() - 1);
+            return Pattern.compile(patternBuilder.toString());
+        } else {
+            return EMPTY_ZERO_LENGTH_PATTERN;
+        }
+    }
+
+    public boolean usesPatternSubscription() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::usesPatternSubscription);
+    }
+
+    // Can be empty if app is started up with no Named Topologies, in order to 
add them on later
+    public boolean isEmpty() {
+        return builders.isEmpty();
+    }
+
+    public String topologyDescription() {
+        if (isEmpty()) {
+            return "";
+        }
+        final StringBuilder sb = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            sb.append(b.describe().toString());
+        });
+
+        return sb.toString();
+    }
+
+    public final void buildAndRewriteTopology() {
+        applyToEachBuilder(builder -> {
+            builder.rewriteTopology(config);
+            builder.buildTopology();
+
+            // As we go, check each topology for overlap in the set of input 
topics/patterns
+            final int numInputTopics = allInputTopics.size();
+            final List<String> inputTopics = builder.fullSourceTopicNames();
+            final Collection<String> inputPatterns = 
builder.allSourcePatternStrings();
+
+            final int numNewInputTopics = inputTopics.size() + 
inputPatterns.size();
+            allInputTopics.addAll(inputTopics);
+            allInputTopics.addAll(inputPatterns);
+            if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+                inputTopics.retainAll(allInputTopics);
+                inputPatterns.retainAll(allInputTopics);
+                inputTopics.addAll(inputPatterns);
+                log.error("Tried to add the NamedTopology {} but it had 
overlap with other input topics: {}", builder.namedTopology(), inputTopics);
+                throw new TopologyException("Named Topologies may not 
subscribe to the same input topics or patterns");
+            }
+
+            final ProcessorTopology globalTopology = 
builder.buildGlobalStateTopology();
+            if (globalTopology != null) {
+                if (builder.namedTopology() != null) {
+                    throw new IllegalStateException("Global state stores are 
not supported with Named Topologies");
+                } else if (this.globalTopology == null) {
+                    this.globalTopology = globalTopology;
+                } else {
+                    throw new IllegalStateException("Topology builder had 
global state, but global topology has already been set");
+                }
+            }
+            globalStateStores.putAll(builder.globalStateStores());
+        });
+    }
+
+    public ProcessorTopology buildSubtopology(final TaskId task) {
+        return lookupBuilderForTask(task).buildSubtopology(task.subtopology());
+    }
+
+    public ProcessorTopology globalTaskTopology() {
+        if (hasNamedTopologies()) {
+            throw new IllegalStateException("Global state stores are not 
supported with Named Topologies");
+        }
+        return globalTopology;
+    }
+
+    public Map<String, StateStore> globalStateStores() {
+        return globalStateStores;
+    }
+
+    public Map<String, List<String>> stateStoreNameToSourceTopics() {
+        final Map<String, List<String>> stateStoreNameToSourceTopics = new 
HashMap<>();
+        applyToEachBuilder(b -> 
stateStoreNameToSourceTopics.putAll(b.stateStoreNameToSourceTopics()));
+        return stateStoreNameToSourceTopics;
+    }
+
+    public String getStoreForChangelogTopic(final String topicName) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final String store = builder.getStoreForChangelogTopic(topicName);
+            if (store != null) {
+                return store;
+            }
+        }
+        log.warn("Unable to locate any store for topic {}", topicName);
+        return "";

Review comment:
       Just being consistent with what was returned in that case before. If we 
return `null` then an NPE would be thrown atm. 
   
   It's used by `KafkaStreams#allLocalStorePartitionLags` ie KIP-535. Could 
definitely be cleaned up, but I don't want to pull that into scope of this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to