guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
##########
@@ -66,14 +72,26 @@
         );
     }
 
+    Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final 
Set<String> currentTopologies) {
+        return unknownTasksToBeCreated.entrySet().stream().filter(t -> 
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
 Entry::getValue));

Review comment:
       Ditto here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
     private ProcessorTopology globalTopology;
-    private Map<String, StateStore> globalStateStores = new HashMap<>();
-    final Set<String> allInputTopics = new HashSet<>();
+    private final Map<String, StateStore> globalStateStores = new HashMap<>();
+    private final Set<String> allInputTopics = new HashSet<>();
+
+    public static class TopologyVersion {
+        public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+        public Set<String> assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+        public ReentrantLock topologyLock = new ReentrantLock();
+        public Condition topologyCV = topologyLock.newCondition();
+    }
 
-    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+    public TopologyMetadata(final InternalTopologyBuilder builder,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
-        builders = new TreeMap<>();
+        builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
             builders.put(builder.topologyName(), builder);
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
     }
 
-    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+    public TopologyMetadata(final ConcurrentNavigableMap<String, 
InternalTopologyBuilder> builders,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
+
         this.builders = builders;
         if (builders.isEmpty()) {
-            log.debug("Building KafkaStreams app with no empty topology");
+            log.debug("Starting up empty KafkaStreams app with no topology");
         }
     }
 
-    public int getNumStreamThreads(final StreamsConfig config) {
-        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        version.assignedNamedTopologies = assignedNamedTopologies;
+    }
 
-        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-        if (builders.isEmpty()) {
-            if (configuredNumStreamThreads != 0) {
-                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+    /**
+     * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+     */
+    public Set<String> assignmentNamedTopologies() {
+        return version.assignedNamedTopologies;
+    }
+
+    public long topologyVersion() {
+        return version.topologyVersion.get();
+    }
+
+    public void lock() {
+        version.topologyLock.lock();
+    }
+
+    public void unlock() {
+        version.topologyLock.unlock();
+    }
+
+    public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+        return builders.get(name);
+    }
+
+    /**
+     * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+     */
+    public void maybeWaitForNonEmptyTopology() {
+        if (!version.topologyLock.isHeldByCurrentThread()) {

Review comment:
       I feel a bit concerned about the "asymmetry" of this function: all other 
functions have the lock inside while this function is supposed to be called by 
a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs 
with additional edits.
   
   I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase` 
instead: i.e. we first update the named topology, and then based on the new 
version we can either wait or re-subscribe and trigger rebalance. WDYT?
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
     private ProcessorTopology globalTopology;
-    private Map<String, StateStore> globalStateStores = new HashMap<>();
-    final Set<String> allInputTopics = new HashSet<>();
+    private final Map<String, StateStore> globalStateStores = new HashMap<>();
+    private final Set<String> allInputTopics = new HashSet<>();
+
+    public static class TopologyVersion {
+        public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+        public Set<String> assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+        public ReentrantLock topologyLock = new ReentrantLock();
+        public Condition topologyCV = topologyLock.newCondition();
+    }
 
-    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+    public TopologyMetadata(final InternalTopologyBuilder builder,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
-        builders = new TreeMap<>();
+        builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
             builders.put(builder.topologyName(), builder);
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
     }
 
-    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+    public TopologyMetadata(final ConcurrentNavigableMap<String, 
InternalTopologyBuilder> builders,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
+
         this.builders = builders;
         if (builders.isEmpty()) {
-            log.debug("Building KafkaStreams app with no empty topology");
+            log.debug("Starting up empty KafkaStreams app with no topology");
         }
     }
 
-    public int getNumStreamThreads(final StreamsConfig config) {
-        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        version.assignedNamedTopologies = assignedNamedTopologies;
+    }
 
-        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-        if (builders.isEmpty()) {
-            if (configuredNumStreamThreads != 0) {
-                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+    /**
+     * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+     */
+    public Set<String> assignmentNamedTopologies() {
+        return version.assignedNamedTopologies;
+    }
+
+    public long topologyVersion() {
+        return version.topologyVersion.get();
+    }
+
+    public void lock() {
+        version.topologyLock.lock();
+    }
+
+    public void unlock() {
+        version.topologyLock.unlock();
+    }
+
+    public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+        return builders.get(name);
+    }
+
+    /**
+     * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+     */
+    public void maybeWaitForNonEmptyTopology() {
+        if (!version.topologyLock.isHeldByCurrentThread()) {
+            throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+        }
+        while (isEmpty()) {
+            try {
+                log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+                version.topologyCV.await();
+            } catch (final InterruptedException e) {
+                log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+            }
+        }
+    }
+
+    public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+            buildAndVerifyTopology(newTopologyBuilder);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * Removes the topology and blocks until all threads on the older version 
have ack'ed this removal.
+     * IT is guaranteed that no more tasks from this removed topology will be 
processed
+     */
+    public void unregisterTopology(final String topologyName) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Removing NamedTopology {}, latest topology version is 
{}", topologyName, version.topologyVersion.get());
+            final InternalTopologyBuilder removedBuilder = 
builders.remove(topologyName);
+            
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
+            
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    public void buildAndRewriteTopology() {
+        applyToEachBuilder(this::buildAndVerifyTopology);
+    }
+
+    private void buildAndVerifyTopology(final InternalTopologyBuilder builder) 
{
+        builder.rewriteTopology(config);
+        builder.buildTopology();
+
+        // As we go, check each topology for overlap in the set of input 
topics/patterns
+        final int numInputTopics = allInputTopics.size();
+        final List<String> inputTopics = builder.fullSourceTopicNames();
+        final Collection<String> inputPatterns = 
builder.allSourcePatternStrings();
+
+        final int numNewInputTopics = inputTopics.size() + 
inputPatterns.size();
+        allInputTopics.addAll(inputTopics);
+        allInputTopics.addAll(inputPatterns);
+        if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+            inputTopics.retainAll(allInputTopics);
+            inputPatterns.retainAll(allInputTopics);
+            inputTopics.addAll(inputPatterns);
+            log.error("Tried to add the NamedTopology {} but it had overlap 
with other input topics: {}", builder.topologyName(), inputTopics);

Review comment:
       Could we print the `inputTopics` and `inputPatterns` as well in the 
message?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection<NamedTopology> topolog
                     (v1, v2) -> {
                         throw new IllegalArgumentException("Topology names 
must be unique");
                     },
-                    () -> new TreeMap<>())),
+                    () -> new ConcurrentSkipListMap<>())),
                 config),
             config,
             clientSupplier
         );
-        for (final NamedTopology topology : topologies) {
-            nameToTopology.put(topology.name(), topology);
-        }
     }
 
-    public NamedTopology getTopologyByName(final String name) {
-        if (nameToTopology.containsKey(name)) {
-            return nameToTopology.get(name);
-        } else {
-            throw new IllegalArgumentException("Unable to locate a 
NamedTopology called " + name);
+    /**
+     * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
+     */
+    public Optional<NamedTopology> getTopologyByName(final String name) {
+        return 
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+    }
+
+    /**
+     * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * it to begin processing the new topology.
+     *
+     * @throws IllegalArgumentException if this topology name is already in use
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void addNamedTopology(final NamedTopology newTopology) {
+        if (hasStartedOrFinishedShuttingDown()) {
+            throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
+        } else if (getTopologyByName(newTopology.name()).isPresent()) {
+            throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
+                                                   " as another of the same 
name already exists");
         }
+        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
     }
 
-    public void addNamedTopology(final NamedTopology topology) {
-        nameToTopology.put(topology.name(), topology);
-        throw new UnsupportedOperationException();
+    /**
+     * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
+     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * it stops processing the old topology.
+     *
+     * @throws IllegalArgumentException if this topology name cannot be found
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void removeNamedTopology(final String topologyToRemove) {
+        if (!isRunningOrRebalancing()) {
+            throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
+        } else if (!getTopologyByName(topologyToRemove).isPresent()) {
+            throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
+        }
+
+        topologyMetadata.unregisterTopology(topologyToRemove);
     }
 
-    public void removeNamedTopology(final String namedTopology) {
-        throw new UnsupportedOperationException();
+    /**
+     * Do a clean up of the local state directory for this NamedTopology by 
deleting all data with regard to the
+     * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the 
({@link StreamsConfig#STATE_DIR_CONFIG})
+     * <p>
+     * May be called while the Streams is in any state, but only on a {@link 
NamedTopology} that has already been
+     * removed via {@link #removeNamedTopology(String)}.
+     * <p>
+     * Calling this method triggers a restore of local {@link StateStore}s for 
this {@link NamedTopology} if it is
+     * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+     *
+     * @throws IllegalStateException if this {@code NamedTopology} hasn't been 
removed
+     * @throws StreamsException if cleanup failed
+     */
+    public void cleanUpNamedTopology(final String name) {
+        if (getTopologyByName(name).isPresent()) {
+            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");

Review comment:
       Also I checked on the ksql side and found that we first call 
`metadata.remove(queryId.toString());` and then this function, in this case 
would `getTopologyByName` always return nothing? cc @wcarlson5 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,18 +136,36 @@ StreamsProducer threadProducer() {
         return threadProducer;
     }
 
+    Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final 
Set<String> currentTopologies) {
+        return unknownTasksToBeCreated.entrySet().stream().filter(t -> 
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
 Entry::getValue));

Review comment:
       Use the newly added `Utils.filterMap`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability

Review comment:
       How about moving this into the TopologyVersion to be guarded within the 
same lock as well, instead of making itself a concurrent data structure?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
     private ProcessorTopology globalTopology;
-    private Map<String, StateStore> globalStateStores = new HashMap<>();
-    final Set<String> allInputTopics = new HashSet<>();
+    private final Map<String, StateStore> globalStateStores = new HashMap<>();
+    private final Set<String> allInputTopics = new HashSet<>();
+
+    public static class TopologyVersion {
+        public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+        public Set<String> assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+        public ReentrantLock topologyLock = new ReentrantLock();
+        public Condition topologyCV = topologyLock.newCondition();
+    }
 
-    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+    public TopologyMetadata(final InternalTopologyBuilder builder,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
-        builders = new TreeMap<>();
+        builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
             builders.put(builder.topologyName(), builder);
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
     }
 
-    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+    public TopologyMetadata(final ConcurrentNavigableMap<String, 
InternalTopologyBuilder> builders,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
+
         this.builders = builders;
         if (builders.isEmpty()) {
-            log.debug("Building KafkaStreams app with no empty topology");
+            log.debug("Starting up empty KafkaStreams app with no topology");
         }
     }
 
-    public int getNumStreamThreads(final StreamsConfig config) {
-        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        version.assignedNamedTopologies = assignedNamedTopologies;
+    }
 
-        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-        if (builders.isEmpty()) {
-            if (configuredNumStreamThreads != 0) {
-                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+    /**
+     * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+     */
+    public Set<String> assignmentNamedTopologies() {
+        return version.assignedNamedTopologies;
+    }
+
+    public long topologyVersion() {
+        return version.topologyVersion.get();
+    }
+
+    public void lock() {

Review comment:
       This seems only used for the `updatePhase` outside the class itself -- I 
think we should better just still keep it only used within a single function of 
the TopologyMetadata. See my other related comments.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection<NamedTopology> topolog
                     (v1, v2) -> {
                         throw new IllegalArgumentException("Topology names 
must be unique");
                     },
-                    () -> new TreeMap<>())),
+                    () -> new ConcurrentSkipListMap<>())),
                 config),
             config,
             clientSupplier
         );
-        for (final NamedTopology topology : topologies) {
-            nameToTopology.put(topology.name(), topology);
-        }
     }
 
-    public NamedTopology getTopologyByName(final String name) {
-        if (nameToTopology.containsKey(name)) {
-            return nameToTopology.get(name);
-        } else {
-            throw new IllegalArgumentException("Unable to locate a 
NamedTopology called " + name);
+    /**
+     * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
+     */
+    public Optional<NamedTopology> getTopologyByName(final String name) {
+        return 
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+    }
+
+    /**
+     * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * it to begin processing the new topology.
+     *
+     * @throws IllegalArgumentException if this topology name is already in use
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void addNamedTopology(final NamedTopology newTopology) {
+        if (hasStartedOrFinishedShuttingDown()) {
+            throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
+        } else if (getTopologyByName(newTopology.name()).isPresent()) {
+            throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
+                                                   " as another of the same 
name already exists");
         }
+        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
     }
 
-    public void addNamedTopology(final NamedTopology topology) {
-        nameToTopology.put(topology.name(), topology);
-        throw new UnsupportedOperationException();
+    /**
+     * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
+     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * it stops processing the old topology.
+     *
+     * @throws IllegalArgumentException if this topology name cannot be found
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void removeNamedTopology(final String topologyToRemove) {
+        if (!isRunningOrRebalancing()) {
+            throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
+        } else if (!getTopologyByName(topologyToRemove).isPresent()) {
+            throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
+        }
+
+        topologyMetadata.unregisterTopology(topologyToRemove);
     }
 
-    public void removeNamedTopology(final String namedTopology) {
-        throw new UnsupportedOperationException();
+    /**
+     * Do a clean up of the local state directory for this NamedTopology by 
deleting all data with regard to the
+     * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the 
({@link StreamsConfig#STATE_DIR_CONFIG})
+     * <p>
+     * May be called while the Streams is in any state, but only on a {@link 
NamedTopology} that has already been
+     * removed via {@link #removeNamedTopology(String)}.
+     * <p>
+     * Calling this method triggers a restore of local {@link StateStore}s for 
this {@link NamedTopology} if it is
+     * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+     *
+     * @throws IllegalStateException if this {@code NamedTopology} hasn't been 
removed
+     * @throws StreamsException if cleanup failed
+     */
+    public void cleanUpNamedTopology(final String name) {
+        if (getTopologyByName(name).isPresent()) {
+            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");

Review comment:
       ".. since the the topology name {} cannot be recognized"

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -327,6 +330,21 @@ public long lagFor(final TaskId task) {
         return totalLag;
     }
 
+    /**
+     * @return the previous tasks assigned to this consumer ordered by lag, 
filtered for any tasks that don't exist in this assignment
+     */
+    public SortedSet<TaskId> prevTasksByLag(final String consumer) {
+        final SortedSet<TaskId> prevTasksByLag = new 
TreeSet<>(comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
+        for (final TaskId task : prevOwnedStatefulTasksByConsumer(consumer)) {
+            if (taskLagTotals.containsKey(task)) {
+                prevTasksByLag.add(task);
+            } else {
+                LOG.debug("Skipping previous task{} since it's not part of the 
current assignment", task);

Review comment:
       `task {}`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -64,6 +65,9 @@
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final StreamThread.ProcessingMode processingMode;
 
+    // tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+    private final Map<TaskId, Set<TopicPartition>>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
       nit: extra space.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
     private ProcessorTopology globalTopology;
-    private Map<String, StateStore> globalStateStores = new HashMap<>();
-    final Set<String> allInputTopics = new HashSet<>();
+    private final Map<String, StateStore> globalStateStores = new HashMap<>();
+    private final Set<String> allInputTopics = new HashSet<>();
+
+    public static class TopologyVersion {
+        public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+        public Set<String> assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+        public ReentrantLock topologyLock = new ReentrantLock();
+        public Condition topologyCV = topologyLock.newCondition();
+    }
 
-    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+    public TopologyMetadata(final InternalTopologyBuilder builder,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
-        builders = new TreeMap<>();
+        builders = new ConcurrentSkipListMap<>();
         if (builder.hasNamedTopology()) {
             builders.put(builder.topologyName(), builder);
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
     }
 
-    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+    public TopologyMetadata(final ConcurrentNavigableMap<String, 
InternalTopologyBuilder> builders,
+                            final StreamsConfig config) {
+        version = new TopologyVersion();
         this.config = config;
+
         this.builders = builders;
         if (builders.isEmpty()) {
-            log.debug("Building KafkaStreams app with no empty topology");
+            log.debug("Starting up empty KafkaStreams app with no topology");
         }
     }
 
-    public int getNumStreamThreads(final StreamsConfig config) {
-        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        version.assignedNamedTopologies = assignedNamedTopologies;
+    }
 
-        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-        if (builders.isEmpty()) {
-            if (configuredNumStreamThreads != 0) {
-                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+    /**
+     * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+     */
+    public Set<String> assignmentNamedTopologies() {
+        return version.assignedNamedTopologies;
+    }
+
+    public long topologyVersion() {
+        return version.topologyVersion.get();
+    }
+
+    public void lock() {
+        version.topologyLock.lock();
+    }
+
+    public void unlock() {
+        version.topologyLock.unlock();
+    }
+
+    public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+        return builders.get(name);
+    }
+
+    /**
+     * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+     */
+    public void maybeWaitForNonEmptyTopology() {
+        if (!version.topologyLock.isHeldByCurrentThread()) {
+            throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+        }
+        while (isEmpty()) {
+            try {
+                log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+                version.topologyCV.await();
+            } catch (final InterruptedException e) {
+                log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+            }
+        }
+    }
+
+    public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+            buildAndVerifyTopology(newTopologyBuilder);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    /**
+     * Removes the topology and blocks until all threads on the older version 
have ack'ed this removal.
+     * IT is guaranteed that no more tasks from this removed topology will be 
processed
+     */
+    public void unregisterTopology(final String topologyName) {
+        try {
+            lock();
+            version.topologyVersion.incrementAndGet();
+            log.info("Removing NamedTopology {}, latest topology version is 
{}", topologyName, version.topologyVersion.get());
+            final InternalTopologyBuilder removedBuilder = 
builders.remove(topologyName);
+            
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
+            
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
+            version.topologyCV.signalAll();
+        } finally {
+            unlock();
+        }
+    }
+
+    public void buildAndRewriteTopology() {
+        applyToEachBuilder(this::buildAndVerifyTopology);
+    }
+
+    private void buildAndVerifyTopology(final InternalTopologyBuilder builder) 
{
+        builder.rewriteTopology(config);
+        builder.buildTopology();
+
+        // As we go, check each topology for overlap in the set of input 
topics/patterns
+        final int numInputTopics = allInputTopics.size();
+        final List<String> inputTopics = builder.fullSourceTopicNames();
+        final Collection<String> inputPatterns = 
builder.allSourcePatternStrings();
+
+        final int numNewInputTopics = inputTopics.size() + 
inputPatterns.size();
+        allInputTopics.addAll(inputTopics);
+        allInputTopics.addAll(inputPatterns);
+        if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+            inputTopics.retainAll(allInputTopics);
+            inputPatterns.retainAll(allInputTopics);
+            inputTopics.addAll(inputPatterns);
+            log.error("Tried to add the NamedTopology {} but it had overlap 
with other input topics: {}", builder.topologyName(), inputTopics);
+            throw new TopologyException("Named Topologies may not subscribe to 
the same input topics or patterns");
+        }
+
+        final ProcessorTopology globalTopology = 
builder.buildGlobalStateTopology();
+        if (globalTopology != null) {
+            if (builder.topologyName() != null) {
+                throw new IllegalStateException("Global state stores are not 
supported with Named Topologies");
+            } else if (this.globalTopology == null) {
+                this.globalTopology = globalTopology;

Review comment:
       Just to clarify: we either support N topologies where none of them have 
global topologies, or we support just one topology with only global topology? 
What's the rationale for supporting the second scenario?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -64,6 +65,9 @@
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final StreamThread.ProcessingMode processingMode;
 
+    // tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+    private final Map<TaskId, Set<TopicPartition>>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
       Also I'm wondering how that would be possible: if the thread/consumer 
has not updated their subscriptions, the leader should not assign any of the 
tasks of the newly added topics to it right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
-    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+    private final TopologyVersion version;
+
+    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability

Review comment:
       Also we can optimize the condition on whether we should enforce a new 
rebalance per-thread in `StreamThread` as: 
`builders.keySet().equals(assignedNamedTopologies)`. The former is the set of 
topologies this instance knows, the latter is the set of topologies the leader 
knows, when they are the same there's no need to trigger rebalance.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -248,12 +249,29 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
         handleRebalanceStart(topics);
         uniqueField++;
 
+        final Set<String> currentNamedTopologies;
+        final Map<TaskId, Long> taskOffsetSums;
+        try {
+            taskManager.topologyMetadata().lock();

Review comment:
       Similar to my other comment: I think we can just add a single function 
of `topologyMetadata` which returns is synchronized on the lock, instead of 
relying on the caller to grab lock? More specifically, the 
`topologyMetadata().hasNamedTopologies()` can be replaced with the 
`!currentNamedTopologies.isEmpty()` and as long as `namedTopologiesView()` is 
synchronized that should be sufficient?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -67,13 +72,31 @@
 public class NamedTopologyIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+    // TODO KAFKA-12648:

Review comment:
       This is meta question: do we have coverage on scenarios where the 
leader/member's bookkept named-topologies set are different? I.e. 1) the leader 
would not try to create any tasks that it's own topology-metadata is not aware 
of even if other subscriptions contain more topics, 2) vice verse, the other 
members would not try to create tasks for assignment that its topology metadata 
does not recognize, while later when they get added the tasks gets created then?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -867,6 +873,36 @@ private void initializeAndRestorePhase() {
         log.debug("Idempotent restore call done. Thread state has not 
changed.");
     }
 
+    private void handleTopologyUpdatesPhase() {
+        // Check if the topology has been updated since we last checked, ie 
via #addNamedTopology or #removeNamedTopology
+        // or if this is the very first topology in which case we may need to 
wait for it to be non-empty
+        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
lastSeenTopologyVersion == 0) {
+            try {

Review comment:
       See my other comment: I'm wondering if we move 
`topologyMetadata.maybeWaitForNonEmptyTopology();` out of this function do we 
still need to grab the lock of topologyMetadata? Seems we can wrap the 
remaining getters into a single call. I.e.:
   
   1. Update the topology version if possible (only need a getter to the 
topologyMetadata). Returns a boolean indicating if the topology has changed.
   2. Check on the current topology, if it is empty first `handleEmptyTopology` 
and then `maybeWaitForNonEmptyTopology`.
   3. Otherwise of 2), check whether 1) returned true, if yes call 
`taskManager.maybeCreateTasksFromNewTopologies` (btw I'm not sure if the 
topology has changed, i.e. the consumer subscription have not been updated, if 
this thread/consumer could ever get tasks of those unknown-yet topologies? I 
think the leader would not assign tasks to those who do not subscribe to yet); 
and also subscribe consumer and enforce rebalance.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -935,6 +944,14 @@ void shutdown(final boolean clean) {
         return tasksToCloseDirty;
     }
 
+    public void updateCurrentAssignmentTopology(final Set<String> 
assignedNamedTopologies) {
+        
topologyMetadata.updateCurrentAssignmentTopology(assignedNamedTopologies);

Review comment:
       The propagated `assignedNamedTopologies` is not used yet anywhere in the 
KS code. Could you elaborate a bit how it would be used in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to