ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669197330
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+// 1) synchronize on these methods instead of individual
InternalTopologyBuilder methods, where applicable
public class TopologyMetadata {
- //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR
#10683)
+ private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+ // the '_' character is not allowed for topology names, thus it's safe to
use to indicate that it's not a named topology
+ private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
+
+ private final StreamsConfig config;
+ private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+
+ private ProcessorTopology globalTopology;
+ private Map<String, StateStore> globalStateStores = new HashMap<>();
+ final Set<String> allInputTopics = new HashSet<>();
+
+ public TopologyMetadata(final InternalTopologyBuilder builder, final
StreamsConfig config) {
+ this.config = config;
+ builders = new TreeMap<>();
+ if (builder.hasNamedTopology()) {
+ builders.put(builder.namedTopology(), builder);
+ } else {
+ builders.put(UNNAMED_TOPOLOGY, builder);
+ }
+ }
+
+ public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder>
builders, final StreamsConfig config) {
+ this.config = config;
+ this.builders = builders;
+ if (builders.isEmpty()) {
+ log.debug("Building KafkaStreams app with no empty topology");
+ }
+ }
+
+ public int getNumStreamThreads(final StreamsConfig config) {
+ final int configuredNumStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+ // If the application uses named topologies, it's possible to start up
with no topologies at all and only add them later
+ if (builders.isEmpty()) {
+ if (configuredNumStreamThreads != 0) {
Review comment:
Also, if a user has a global-only topology, then it's absolutely valid
for them to eplicitly configure the app to have no StreamThreads. In fact if we
detect that case then we actually override the configured num.stream.threads to
0 for them, anyways
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]