[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability

Review comment:
   Yeah I think it would fine (better even) to swap in the topologies 
rather than the topology builders, the only reason for using the builders now 
is that a huge amount of topology-related functionality currently resides in 
the InternalTopologyBuilder, including pretty much all the metadata. I 100% 
would [support cleaning this 
up](https://github.com/apache/kafka/pull/10683#discussion_r679581108) and 
separating things out from this class and made sure it would be easy to do so 
here, the builders are really only kept around after the topology is built 
because they still contain most of the metadata we need. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability

Review comment:
   Yeah I think it would fine (better even) to swap in the topologies 
rather than the topology builders, the only reason for using the builders now 
is that a huge amount of topology-related functionality currently resides in 
the InternalTopologyBuilder, including pretty much all the metadata. I 100% 
would support cleaning this up and separating things out from this class and 
made sure it would be easy to do so here, the builders are really only kept 
around after the topology is built because they still contain most of the 
metadata we need




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679581108



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679572946



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679572637



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -16,17 +16,240 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
 public class NamedTopologyIntegrationTest {
-//TODO KAFKA-12648
-/**
- * Things to test in Pt. 2 -  Introduce TopologyMetadata to wrap 
InternalTopologyBuilders of named topologies:
- * 1. Verify changelog & repartition topics decorated with named topology
- * 2. Make sure app run and works with
- * -multiple subtopologies
- * -persistent state
- * -multi-partition input & output topics
- * -standbys
- * -piped input and verified output records
- * 3. Is the task assignment balanced? Does KIP-441/warmup replica 
placement work as intended?
- */
+
+private static final int NUM_BROKERS = 1;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public final TestName testName = new TestName();
+private String appId;
+
+private String inputStream1;
+private String inputStream2;
+private String inputStream3;
+private String outputStream1;
+private String outputStream2;
+private String outputStream3;
+private String storeChangelog1;
+private String storeChangelog2;
+private String storeChangelog3;
+
+final List> standardInputData = 
asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 
300L), KeyValue.pair("C", 400L));
+final List> standardOutputData = 
asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); 
// output of basic count topology with caching
+
+final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
+final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
LongSerializer.class);
+final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+
+final NamedTopologyStreamsBuilder builder1 = new 
NamedTopologyStreamsBuilder("topology-1");
+final NamedTopologyStreamsBuilder builder2 = new 
NamedTopologyStreamsBuilder("topology-2");
+final NamedTopologyStreamsBuilder builder3 = new 
NamedTopologyStreamsBuilder("topology-3");
+
+Properties props;
+KafkaStreamsNamedTopologyWrapper streams;
+
+private Properties configProps() {
+final Properties streamsConfiguration = new Properties();
+

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r679571374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,304 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
+// that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.topologyName(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are named topologies but some are empty, this indicates a 
bug in user code
+if (hasNamedTopologies()) {
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   Ahh yeah I was even confusing myself with this after a while. "Local 
topology" sounds much better -- will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() {
 }
 }
 
-public synchronized Pattern earliestResetTopicsPattern() {
-return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
+public boolean hasOffsetResetOverrides() {
+return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
+&& latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
 }
 
-public synchronized Pattern latestResetTopicsPattern() {
-return resetTopicsPattern(latestResetTopics, latestResetPatterns);
-}
-
-private Pattern resetTopicsPattern(final Set resetTopics,
-   final Set resetPatterns) {
-final List topics = 
maybeDecorateInternalSourceTopics(resetTopics);
-
-return buildPattern(topics, resetPatterns);
-}
-
-private static Pattern buildPattern(final Collection sourceTopics,
-final Collection 
sourcePatterns) {
-final StringBuilder builder = new StringBuilder();
-
-for (final String topic : sourceTopics) {
-builder.append(topic).append("|");
-}
-
-for (final Pattern sourcePattern : sourcePatterns) {
-builder.append(sourcePattern.pattern()).append("|");
-}
-
-if (builder.length() > 0) {
-builder.setLength(builder.length() - 1);
-return Pattern.compile(builder.toString());
+public OffsetResetStrategy offsetResetStrategy(final String topic) {
+if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return EARLIEST;
+} else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
+latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+return LATEST;
+} else if 
(maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic)

Review comment:
   The `NONE` case means we do have this topic in this 
InternalTopologyBuilder (as opposed to that of a different NamedTopology) but 
it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the 
first two `if` conditions above, then all that's left is to verify whether or 
not we have this topic at all -- which is going to be true if we find it in 
either the source topic set or pattern.
   
   Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically 
if we don't have any NamedTopologies then there is only one 
InternalTopologyBuilder, so all topics should belong to it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669250045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro
 return Collections.unmodifiableMap(globalStateStores);
 }
 
-public Set allStateStoreName() {
+public Set allStateStoreNames() {
 Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
 final Set allNames = new HashSet<>(stateFactories.keySet());
 allNames.addAll(globalStateStores.keySet());
 return Collections.unmodifiableSet(allNames);
 }
 
+public boolean hasStore(final String name) {
+return stateFactories.containsKey(name) || 
globalStateStores.containsKey(name);
+}
+
+public boolean hasPersistentStores() {

Review comment:
   Previously we would get a handle on the actual topology and then it 
would have to iterate through all the stores to check each one for persistence. 
But while you can now add and remove individual named topologies, you still 
can't change a topology or the stores in it while the app is running, so we may 
as well just keep track of whether we found any persistent stores or not as we 
go along, rather than iterate over all of them later. Also, this way we can 
keep and access this metadata easily through the 
TopologyMetadata/InternalTopologyBuilder, rather than ever having to go access 
the ProcessorTopology directly at all
   
   That said, I'm not _too_ attached to this way of doing things, so if you 
have concerns I can go back to something like how it was before. Just lmk what 
you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669233192



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {

Review comment:
   I tried to, but just couldn't make it work. It has to do with Java and 
subclassing quirks like constructing the parent before the child. It seems to 
be pretty much impossible to set things up so that everything is `final` -- if 
we set the `topologyName` in the NamedTopology constructor, then it's not 
accessible (ie always null) when we call the `InternalTopologyBuilder`'s 
constructor since that occurs during the parent `Topology`'s construction.
   
   It's definitely annoying, but at least we should be able to clean things up 
once we go through a KIP and don't need to subclass like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   @guozhangwang  WDYT? If the user has started up Streams with several 
named topologies, but a subset of them are completely empty, should this be 
considered user error and cause Streams to shutdown or should we just roll with 
it as long as at least one topology is non-empty?
   
   Take a look at the current state and lmk what you think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   @guozhangwang  WDYT? If the user has started up Streams with several 
named topologies, but a subset of them are completely empty, should this be 
considered user error and cause Streams to shutdown or should we just roll with 
it as long as at least one topology is non-empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669210507



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
   Sure, but if `builders.isEmpty` then we would enter the `if` block above 
and return before reaching this section of the code. But I think maybe you 
meant that in `hasNoNonGlobalTopology`, we should actually return true only if 
_all_ builders have no non-global topology, not if that's true for _any one_ of 
them? There's some argument to be made for how to handle the case where some 
named topologies are legit, while others are empty, but I would still advocate 
for throwing an exception when _any_ topology is empty since this is not a 
valid configuration. In which case, the current code is correct, but the 
comment is not. I'll fix the misleading comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669197330



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {

Review comment:
   Also, if a user has a global-only topology, then it's absolutely valid 
for them to eplicitly configure the app to have no StreamThreads. In fact if we 
detect that case then we actually override the configured num.stream.threads to 
0 for them, anyways




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196718



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+
+// Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+return 0;
+}
+
+return configuredNumStreamThreads;
+}
+
+public boolean hasNamedTopologies() {
+// This includes the case of starting up with no named topologies at 
all
+return !builders.containsKey(UNNAMED_TOPOLOGY);
+}
+
+public boolean hasGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+}
+
+public boolean hasNoNonGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+}
+
+public boolean hasPersistentStores() {
+// If the app is using named topologies, there may not be any 
persistent state when it first starts up
+// but a new NamedTopology may introduce it later, so we must return 
true
+if (hasNamedTopologies()) {
+return true;
+}
+return 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196246



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+// the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+private final StreamsConfig config;
+private final SortedMap builders; // Keep 
sorted by topology name for readability
+
+private ProcessorTopology globalTopology;
+private Map globalStateStores = new HashMap<>();
+final Set allInputTopics = new HashSet<>();
+
+public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+this.config = config;
+builders = new TreeMap<>();
+if (builder.hasNamedTopology()) {
+builders.put(builder.namedTopology(), builder);
+} else {
+builders.put(UNNAMED_TOPOLOGY, builder);
+}
+}
+
+public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+this.config = config;
+this.builders = builders;
+if (builders.isEmpty()) {
+log.debug("Building KafkaStreams app with no empty topology");
+}
+}
+
+public int getNumStreamThreads(final StreamsConfig config) {
+final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+if (builders.isEmpty()) {
+if (configuredNumStreamThreads != 0) {
+log.info("Overriding number of StreamThreads to zero for empty 
topology");
+}
+return 0;
+}
+
+// If there are topologies but they are all empty, this indicates a 
bug in user code
+if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+
+// Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+return 0;
+}
+
+return configuredNumStreamThreads;
+}
+
+public boolean hasNamedTopologies() {
+// This includes the case of starting up with no named topologies at 
all
+return !builders.containsKey(UNNAMED_TOPOLOGY);
+}
+
+public boolean hasGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+}
+
+public boolean hasNoNonGlobalTopology() {
+return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+}
+
+public boolean hasPersistentStores() {
+// If the app is using named topologies, there may not be any 
persistent state when it first starts up
+// but a new NamedTopology may introduce it later, so we must return 
true
+if (hasNamedTopologies()) {
+return true;
+}
+return 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669193117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -364,6 +371,10 @@ public synchronized final StreamsConfig getStreamsConfig() 
{
 return config;
 }
 
+public String namedTopology() {

Review comment:
   You mean to rename this to `topologyName()`? Ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669192999



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
 }
 }
 
+public void setTopologyName(final String namedTopology) {
+Objects.requireNonNull(namedTopology, "named topology can't be null");
+if (this.namedTopology != null) {
+log.error("Tried to reset the namedTopology to {} but it was 
already set to {}", namedTopology, this.namedTopology);
+throw new IllegalStateException("NamedTopology has already been 
set to " + this.namedTopology);
+}
+this.namedTopology = namedTopology;
+}
+
 // public for testing only
-public synchronized final InternalTopologyBuilder setApplicationId(final 
String applicationId) {

Review comment:
   I looked around and can't imagine why we would ever need it. 
`setApplicationId` should only ever be called once, from a single 
location/thread




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669191369



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
 if (numStreamThreads == 0) {
 return totalCacheSize;
 }
-return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));

Review comment:
   We're just trying to get the basic NamedTopology feature up and running 
for the time being, so we'll loop back around on any "missing" features 
eventually. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669190141



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
 if (numStreamThreads == 0) {
 return totalCacheSize;
 }
-return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));

Review comment:
   At the moment we just don't allow global stores with named topologies. 
There is a list of not-yet-supported features that are currently incompatible 
with them in the javadocs of KafkaStreamsNamedTopologyWrapper




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669188996



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final StreamsConfig config,
 final Time time) {
-this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), time);
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, new DefaultKafkaClientSupplier(), time);
 }
 
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
- final StreamsConfig config,
- final KafkaClientSupplier clientSupplier) throws 
StreamsException {
-this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
-}
-
-private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+private KafkaStreams(final Topology topology,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time) throws StreamsException {
+this(new TopologyMetadata(topology.internalTopologyBuilder, config), 
config, clientSupplier, time);
+}
+
+protected KafkaStreams(final TopologyMetadata topologyMetadata,

Review comment:
   Just this one does, as it's called from the child class 
KafkaStreamsNamedTopologyWrapper. I'll change the one below back to private 
though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org