guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r677787551



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -859,27 +855,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         ClientMetrics.addVersionMetric(streamsMetrics);
         ClientMetrics.addCommitIdMetric(streamsMetrics);
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
internalTopologyBuilder.describe().toString());
+        ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
this.topologyMetadata.topologyDescriptionString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) -> getNumLiveStreamThreads());
 
         streamsMetadataState = new StreamsMetadataState(
-            internalTopologyBuilder,
+                this.topologyMetadata,

Review comment:
       nit: seems misaligned.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+    // that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.topologyName(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+            }
+            return 0;
+        }
+
+        // If there are named topologies but some are empty, this indicates a 
bug in user code
+        if (hasNamedTopologies()) {
+            if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
       I kept getting myself confused by the `NoNonGlobal` haha (got me for the 
first time reviewing this, and then again for the third pass) :P As a hindsight 
maybe we should invent the term "local topology" at the first place.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -93,24 +95,25 @@ public void init(final ProcessorContext context,
     public void init(final StateStoreContext context,
                      final StateStore root) {
         this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
+        taskId = context.taskId();
         initStoreSerde(context);
-        taskId = context.taskId().toString();
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
         registerMetrics();
         final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId, metricsScope, name(), 
streamsMetrics);
+            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
     }
 
     private void registerMetrics() {
-        putSensor = StateStoreMetrics.putSensor(taskId, metricsScope, name(), 
streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(taskId, metricsScope, 
name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId, metricsScope, 
name(), streamsMetrics);
-        removeSensor = StateStoreMetrics.removeSensor(taskId, metricsScope, 
name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, 
metricsScope, name(), streamsMetrics);
+

Review comment:
       empty line?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-    //TODO KAFKA-12648
-    /**
-     * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
-     * 1. Verify changelog & repartition topics decorated with named topology
-     * 2. Make sure app run and works with
-     *         -multiple subtopologies
-     *         -persistent state
-     *         -multi-partition input & output topics
-     *         -standbys
-     *         -piped input and verified output records
-     * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
-     */
+
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public final TestName testName = new TestName();
+    private String appId;
+
+    private String inputStream1;
+    private String inputStream2;
+    private String inputStream3;
+    private String outputStream1;
+    private String outputStream2;
+    private String outputStream3;
+    private String storeChangelog1;
+    private String storeChangelog2;
+    private String storeChangelog3;
+
+    final List<KeyValue<String, Long>> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+    final List<KeyValue<String, Long>> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+    final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+    final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+    final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+    final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+    final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+    final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+    Properties props;
+    KafkaStreamsNamedTopologyWrapper streams;
+
+    private Properties configProps() {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    @Before
+    public void setup() throws InterruptedException {
+        appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, 
testName);
+        inputStream1 = appId + "-input-stream-1";
+        inputStream2 = appId + "-input-stream-2";
+        inputStream3 = appId + "-input-stream-3";
+        outputStream1 = appId + "-output-stream-1";
+        outputStream2 = appId + "-output-stream-2";
+        outputStream3 = appId + "-output-stream-3";
+        storeChangelog1 = appId + "-topology-1-store-changelog";
+        storeChangelog2 = appId + "-topology-2-store-changelog";
+        storeChangelog3 = appId + "-topology-3-store-changelog";
+        props = configProps();
+        CLUSTER.createTopic(inputStream1, 2, 1);
+        CLUSTER.createTopic(inputStream2, 2, 1);
+        CLUSTER.createTopic(inputStream3, 2, 1);
+        CLUSTER.createTopic(outputStream1, 2, 1);
+        CLUSTER.createTopic(outputStream2, 2, 1);
+        CLUSTER.createTopic(outputStream3, 2, 1);
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+        CLUSTER.deleteTopics(inputStream1, inputStream2, inputStream3, 
outputStream1, outputStream2, outputStream3);
+    }
+
+    @Test
+    public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() 
throws Exception {
+        produceToInputTopics(inputStream1, standardInputData);
+        builder1.stream(inputStream1)
+            .selectKey((k, v) -> k)
+            .groupByKey()
+            .count(Materialized.as(Stores.persistentKeyValueStore("store")))
+            .toStream().to(outputStream1);
+        streams = new 
KafkaStreamsNamedTopologyWrapper(builder1.buildNamedTopology(props), props, 
clientSupplier);
+        
IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams),
 Duration.ofSeconds(15));
+        final List<KeyValue<String, Long>> results = 
waitUntilMinKeyValueRecordsReceived(consumerConfig, outputStream1, 3);
+        assertThat(results, equalTo(standardOutputData));
+
+        final Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
+        assertThat(allTopics.contains(appId + "-" + "topology-1" + 
"-store-changelog"), is(true));
+        assertThat(allTopics.contains(appId + "-" + "topology-1" + 
"-store-repartition"), is(true));
+    }
+
+    @Test
+    public void 
shouldProcessMultipleIdenticalNamedTopologiesWithPersistentStateStores() throws 
Exception {
+        produceToInputTopics(inputStream1, standardInputData);
+        produceToInputTopics(inputStream2, standardInputData);
+        produceToInputTopics(inputStream3, standardInputData);
+
+        builder1.stream(inputStream1).selectKey((k, v) -> 
k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream1);
+        builder2.stream(inputStream2).selectKey((k, v) -> 
k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream2);
+        builder3.stream(inputStream3).selectKey((k, v) -> 
k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream3);
+        streams = new 
KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(builder1, builder2, 
builder3), props, clientSupplier);

Review comment:
       Just curious, what if we just pass in `buildNamedTopologies(builder1, 
builder1, builder1)` here?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-    //TODO KAFKA-12648
-    /**
-     * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
-     * 1. Verify changelog & repartition topics decorated with named topology
-     * 2. Make sure app run and works with
-     *         -multiple subtopologies
-     *         -persistent state
-     *         -multi-partition input & output topics
-     *         -standbys
-     *         -piped input and verified output records
-     * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
-     */
+
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public final TestName testName = new TestName();
+    private String appId;
+
+    private String inputStream1;
+    private String inputStream2;
+    private String inputStream3;
+    private String outputStream1;
+    private String outputStream2;
+    private String outputStream3;
+    private String storeChangelog1;
+    private String storeChangelog2;
+    private String storeChangelog3;
+
+    final List<KeyValue<String, Long>> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+    final List<KeyValue<String, Long>> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+    final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+    final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+    final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+    final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+    final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+    final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+    Properties props;
+    KafkaStreamsNamedTopologyWrapper streams;
+
+    private Properties configProps() {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    @Before

Review comment:
       nit: setting up and creating a cluster/topics are time consuming for 
ITs, could we consider make it Before/AfterClass so that we only do it once? I 
feel all these tests can share the same topics anyways.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+    // that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability

Review comment:
       Very nit: maybe we can leave some comments on whether we keep the map of 
topologyBuilder instead of the built topologies here (and hence move many of 
the checks ahead before the actual generation of the topology).
   
   Also as a meta thought just for the future roadmap, one caveat of moving the 
checks ahead of time is that it may restrict on what kind of optimizations we 
can do during the topology generation -- e.g. we cannot say generate a topology 
with in-memory store if the builder indicates persistent stores etc. So just 
looking ahead in pt.3 here, do you think in the future (beyond V1) in 
`registerAndBuildNewTopology` we can still just rebuild the topology 
immediately and track based on the topologies not topology builders? 
@ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to