ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682233727



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
     private ProcessorTopology globalTopology;
-    private Map<String, StateStore> globalStateStores = new HashMap<>();
-    final Set<String> allInputTopics = new HashSet<>();
+    private final Map<String, StateStore> globalStateStores = new HashMap<>();
+    private final Set<String> allInputTopics = new HashSet<>();
+
+    public static class TopologyVersion {
+        public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+        public Set<String> assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+        public ReentrantLock topologyLock = new ReentrantLock();
+        public Condition topologyCV = topologyLock.newCondition();
+    }
 
-    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+    public TopologyMetadata(final InternalTopologyBuilder builder,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
-        builders = new TreeMap<>();
+        builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
             builders.put(builder.topologyName(), builder);
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
     }
 
-    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+    public TopologyMetadata(final ConcurrentNavigableMap<String, 
InternalTopologyBuilder> builders,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
+
         this.builders = builders;
         if (builders.isEmpty()) {
-            log.debug("Building KafkaStreams app with no empty topology");
+            log.debug("Starting up empty KafkaStreams app with no topology");
         }
     }
 
-    public int getNumStreamThreads(final StreamsConfig config) {
-        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        version.assignedNamedTopologies = assignedNamedTopologies;
+    }
 
-        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-        if (builders.isEmpty()) {
-            if (configuredNumStreamThreads != 0) {
-                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+    /**
+     * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+     */
+    public Set<String> assignmentNamedTopologies() {
+        return version.assignedNamedTopologies;
+    }
+
+    public long topologyVersion() {
+        return version.topologyVersion.get();
+    }
+
+    public void lock() {
+        version.topologyLock.lock();
+    }
+
+    public void unlock() {
+        version.topologyLock.unlock();
+    }
+
+    public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+        return builders.get(name);
+    }
+
+    /**
+     * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+     */
+    public void maybeWaitForNonEmptyTopology() {
+        if (!version.topologyLock.isHeldByCurrentThread()) {
+            throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+        }
+        while (isEmpty()) {
+            try {
+                log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+                version.topologyCV.await();
+            } catch (final InterruptedException e) {
+                log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+            }
+        }
+    }
+
+    public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+            buildAndVerifyTopology(newTopologyBuilder);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * Removes the topology and blocks until all threads on the older version 
have ack'ed this removal.
+     * IT is guaranteed that no more tasks from this removed topology will be 
processed
+     */
+    public void unregisterTopology(final String topologyName) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Removing NamedTopology {}, latest topology version is 
{}", topologyName, version.topologyVersion.get());
+            final InternalTopologyBuilder removedBuilder = 
builders.remove(topologyName);
+            
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
+            
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    public void buildAndRewriteTopology() {
+        applyToEachBuilder(this::buildAndVerifyTopology);
+    }
+
+    private void buildAndVerifyTopology(final InternalTopologyBuilder builder) 
{
+        builder.rewriteTopology(config);
+        builder.buildTopology();
+
+        // As we go, check each topology for overlap in the set of input 
topics/patterns
+        final int numInputTopics = allInputTopics.size();
+        final List<String> inputTopics = builder.fullSourceTopicNames();
+        final Collection<String> inputPatterns = 
builder.allSourcePatternStrings();
+
+        final int numNewInputTopics = inputTopics.size() + 
inputPatterns.size();
+        allInputTopics.addAll(inputTopics);
+        allInputTopics.addAll(inputPatterns);
+        if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+            inputTopics.retainAll(allInputTopics);
+            inputPatterns.retainAll(allInputTopics);
+            inputTopics.addAll(inputPatterns);
+            log.error("Tried to add the NamedTopology {} but it had overlap 
with other input topics: {}", builder.topologyName(), inputTopics);

Review comment:
       You mean print them separately? Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to