This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 68d6d99 KAFKA-9020: Streams sub-topologies should be sorted by sink -> source relationship (#7495) 68d6d99 is described below commit 68d6d99f62541ea77855e153258b5158afdf2dee Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Mon Oct 14 13:15:03 2019 -0700 KAFKA-9020: Streams sub-topologies should be sorted by sink -> source relationship (#7495) Subtopologies are currently ordered alphabetically by source node, which prior to KIP-307 happened to always result in the "correct" (ie topological) order. Now that users may name their nodes anything they want, we must explicitly order them so that upstream node groups/subtopologies come first and the downstream ones come after. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../internals/InternalTopologyBuilder.java | 22 +- .../apache/kafka/streams/StreamsBuilderTest.java | 8 +- .../kstream/RepartitionTopicNamingTest.java | 2 +- .../internals/RepartitionOptimizingTest.java | 347 +++++++++++---------- .../RepartitionWithMergeOptimizingTest.java | 145 +++++---- .../apache/kafka/streams/TopologyTestDriver.java | 4 +- 6 files changed, 259 insertions(+), 269 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index d34ff00..5754b4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.TopologyException; @@ -85,7 +84,7 @@ public class InternalTopologyBuilder { // map from source processor names to regex subscription patterns private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); - // map from sink processor names to subscribed topic (without application-id prefix for internal topics) + // map from sink processor names to sink topic (without application-id prefix for internal topics) private final Map<String, String> nodeToSinkTopic = new HashMap<>(); // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node @@ -749,27 +748,18 @@ public class InternalTopologyBuilder { return nodeGroups; } + // Order node groups by their position in the actual topology, ie upstream subtopologies come before downstream private Map<Integer, Set<String>> makeNodeGroups() { final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>(); final Map<String, Set<String>> rootToNodeGroup = new HashMap<>(); int nodeGroupId = 0; - // Go through source nodes first. This makes the group id assignment easy to predict in tests - final Set<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet()); - allSourceNodes.addAll(nodeToSourcePatterns.keySet()); - - for (final String nodeName : Utils.sorted(allSourceNodes)) { + // Traverse in topological order + for (final String nodeName : nodeFactories.keySet()) { nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup); } - // Go through non-source nodes - for (final String nodeName : Utils.sorted(nodeFactories.keySet())) { - if (!nodeToSourceTopics.containsKey(nodeName)) { - nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup); - } - } - return nodeGroups; } @@ -1905,11 +1895,11 @@ public class InternalTopologyBuilder { // following functions are for test only - public synchronized Set<String> getSourceTopicNames() { + public synchronized Set<String> sourceTopicNames() { return sourceTopicNames; } - public synchronized Map<String, StateStoreFactory> getStateStores() { + public synchronized Map<String, StateStoreFactory> stateStores() { return stateFactories; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 4118297..0cce24f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -385,10 +385,10 @@ public class StreamsBuilderTest { internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic"))); assertThat( - internalTopologyBuilder.getStateStores().keySet(), + internalTopologyBuilder.stateStores().keySet(), equalTo(Collections.singleton("store"))); assertThat( - internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), + internalTopologyBuilder.stateStores().get("store").loggingEnabled(), equalTo(false)); assertThat( internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), @@ -407,10 +407,10 @@ public class StreamsBuilderTest { internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "appId-store-changelog"))); assertThat( - internalTopologyBuilder.getStateStores().keySet(), + internalTopologyBuilder.stateStores().keySet(), equalTo(Collections.singleton("store"))); assertThat( - internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), + internalTopologyBuilder.stateStores().get("store").loggingEnabled(), equalTo(true)); assertThat( internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index 0f3e33d..cf8d6b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -603,7 +603,7 @@ public class RepartitionTopicNamingTest { " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + " --> KSTREAM-MAP-0000000001\n" + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" + + " --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" + " <-- KSTREAM-SOURCE-0000000000\n" + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" + " --> KSTREAM-PEEK-0000000021\n" + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 8425ad7..cf46af6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -279,182 +279,183 @@ public class RepartitionOptimizingTest { } private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" - + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n" - + " --> aggregate, count, join-filter, reduce-filter\n" - + " Processor: count (stores: [count-store])\n" - + " --> count-toStream\n" - + " <-- KSTREAM-SOURCE-0000000036\n" - + " Processor: count-toStream (stores: [])\n" - + " --> join-other-windowed, count-to\n" - + " <-- count\n" - + " Processor: join-filter (stores: [])\n" - + " --> join-this-windowed\n" - + " <-- KSTREAM-SOURCE-0000000036\n" - + " Processor: reduce-filter (stores: [])\n" - + " --> reduce-peek\n" - + " <-- KSTREAM-SOURCE-0000000036\n" - + " Processor: join-other-windowed (stores: [other-join-store])\n" - + " --> join-other-join\n" - + " <-- count-toStream\n" - + " Processor: join-this-windowed (stores: [join-store])\n" - + " --> join-this-join\n" - + " <-- join-filter\n" - + " Processor: reduce-peek (stores: [])\n" - + " --> reducer\n" - + " <-- reduce-filter\n" - + " Processor: aggregate (stores: [aggregate-store])\n" - + " --> aggregate-toStream\n" - + " <-- KSTREAM-SOURCE-0000000036\n" - + " Processor: join-other-join (stores: [join-store])\n" - + " --> join-merge\n" - + " <-- join-other-windowed\n" - + " Processor: join-this-join (stores: [other-join-store])\n" - + " --> join-merge\n" - + " <-- join-this-windowed\n" - + " Processor: reducer (stores: [reduce-store])\n" - + " --> reduce-toStream\n" - + " <-- reduce-peek\n" - + " Processor: aggregate-toStream (stores: [])\n" - + " --> reduce-to\n" - + " <-- aggregate\n" - + " Processor: join-merge (stores: [])\n" - + " --> join-to\n" - + " <-- join-this-join, join-other-join\n" - + " Processor: reduce-toStream (stores: [])\n" - + " --> KSTREAM-SINK-0000000023\n" - + " <-- reducer\n" - + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" - + " <-- reduce-toStream\n" - + " Sink: count-to (topic: outputTopic_0)\n" - + " <-- count-toStream\n" - + " Sink: join-to (topic: joinedOutputTopic)\n" - + " <-- join-merge\n" - + " Sink: reduce-to (topic: outputTopic_1)\n" - + " <-- aggregate-toStream\n" - + "\n" - + " Sub-topology: 1\n" - + " Source: sourceStream (topics: [input])\n" - + " --> source-map\n" - + " Processor: source-map (stores: [])\n" - + " --> process-filter, KSTREAM-FILTER-0000000035\n" - + " <-- sourceStream\n" - + " Processor: process-filter (stores: [])\n" - + " --> process-mapValues\n" - + " <-- source-map\n" - + " Processor: KSTREAM-FILTER-0000000035 (stores: [])\n" - + " --> KSTREAM-SINK-0000000034\n" - + " <-- source-map\n" - + " Processor: process-mapValues (stores: [])\n" - + " --> process\n" - + " <-- process-filter\n" - + " Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000035\n" - + " Processor: process (stores: [])\n" - + " --> none\n" - + " <-- process-mapValues\n\n"; + + " Sub-topology: 0\n" + + " Source: sourceStream (topics: [input])\n" + + " --> source-map\n" + + " Processor: source-map (stores: [])\n" + + " --> process-filter, KSTREAM-FILTER-0000000035\n" + + " <-- sourceStream\n" + + " Processor: process-filter (stores: [])\n" + + " --> process-mapValues\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000035 (stores: [])\n" + + " --> KSTREAM-SINK-0000000034\n" + + " <-- source-map\n" + + " Processor: process-mapValues (stores: [])\n" + + " --> process\n" + + " <-- process-filter\n" + + " Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000035\n" + + " Processor: process (stores: [])\n" + + " --> none\n" + + " <-- process-mapValues\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n" + + " --> aggregate, count, join-filter, reduce-filter\n" + + " Processor: count (stores: [count-store])\n" + + " --> count-toStream\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: count-toStream (stores: [])\n" + + " --> join-other-windowed, count-to\n" + + " <-- count\n" + + " Processor: join-filter (stores: [])\n" + + " --> join-this-windowed\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: reduce-filter (stores: [])\n" + + " --> reduce-peek\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: join-other-windowed (stores: [other-join-store])\n" + + " --> join-other-join\n" + + " <-- count-toStream\n" + + " Processor: join-this-windowed (stores: [join-store])\n" + + " --> join-this-join\n" + + " <-- join-filter\n" + + " Processor: reduce-peek (stores: [])\n" + + " --> reducer\n" + + " <-- reduce-filter\n" + + " Processor: aggregate (stores: [aggregate-store])\n" + + " --> aggregate-toStream\n" + + " <-- KSTREAM-SOURCE-0000000036\n" + + " Processor: join-other-join (stores: [join-store])\n" + + " --> join-merge\n" + + " <-- join-other-windowed\n" + + " Processor: join-this-join (stores: [other-join-store])\n" + + " --> join-merge\n" + + " <-- join-this-windowed\n" + + " Processor: reducer (stores: [reduce-store])\n" + + " --> reduce-toStream\n" + + " <-- reduce-peek\n" + + " Processor: aggregate-toStream (stores: [])\n" + + " --> reduce-to\n" + + " <-- aggregate\n" + + " Processor: join-merge (stores: [])\n" + + " --> join-to\n" + + " <-- join-this-join, join-other-join\n" + + " Processor: reduce-toStream (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- reducer\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" + + " <-- reduce-toStream\n" + + " Sink: count-to (topic: outputTopic_0)\n" + + " <-- count-toStream\n" + + " Sink: join-to (topic: joinedOutputTopic)\n" + + " <-- join-merge\n" + + " Sink: reduce-to (topic: outputTopic_1)\n" + + " <-- aggregate-toStream\n\n"; + private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" - + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n" - + " --> count\n" - + " Processor: count (stores: [count-store])\n" - + " --> count-toStream\n" - + " <-- KSTREAM-SOURCE-0000000007\n" - + " Processor: count-toStream (stores: [])\n" - + " --> join-other-windowed, count-to\n" - + " <-- count\n" - + " Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n" - + " --> join-this-windowed\n" - + " Processor: join-other-windowed (stores: [other-join-store])\n" - + " --> join-other-join\n" - + " <-- count-toStream\n" - + " Processor: join-this-windowed (stores: [join-store])\n" - + " --> join-this-join\n" - + " <-- KSTREAM-SOURCE-0000000027\n" - + " Processor: join-other-join (stores: [join-store])\n" - + " --> join-merge\n" - + " <-- join-other-windowed\n" - + " Processor: join-this-join (stores: [other-join-store])\n" - + " --> join-merge\n" - + " <-- join-this-windowed\n" - + " Processor: join-merge (stores: [])\n" - + " --> join-to\n" - + " <-- join-this-join, join-other-join\n" - + " Sink: count-to (topic: outputTopic_0)\n" - + " <-- count-toStream\n" - + " Sink: join-to (topic: joinedOutputTopic)\n" - + " <-- join-merge\n" - + "\n" - + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n" - + " --> aggregate\n" - + " Processor: aggregate (stores: [aggregate-store])\n" - + " --> aggregate-toStream\n" - + " <-- KSTREAM-SOURCE-0000000013\n" - + " Processor: aggregate-toStream (stores: [])\n" - + " --> reduce-to\n" - + " <-- aggregate\n" - + " Sink: reduce-to (topic: outputTopic_1)\n" - + " <-- aggregate-toStream\n" - + "\n" - + " Sub-topology: 2\n" - + " Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n" - + " --> reducer\n" - + " Processor: reducer (stores: [reduce-store])\n" - + " --> reduce-toStream\n" - + " <-- KSTREAM-SOURCE-0000000021\n" - + " Processor: reduce-toStream (stores: [])\n" - + " --> KSTREAM-SINK-0000000023\n" - + " <-- reducer\n" - + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" - + " <-- reduce-toStream\n" - + "\n" - + " Sub-topology: 3\n" - + " Source: sourceStream (topics: [input])\n" - + " --> source-map\n" - + " Processor: source-map (stores: [])\n" - + " --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n" - + " <-- sourceStream\n" - + " Processor: reduce-filter (stores: [])\n" - + " --> reduce-peek\n" - + " <-- source-map\n" - + " Processor: join-filter (stores: [])\n" - + " --> KSTREAM-FILTER-0000000026\n" - + " <-- source-map\n" - + " Processor: process-filter (stores: [])\n" - + " --> process-mapValues\n" - + " <-- source-map\n" - + " Processor: reduce-peek (stores: [])\n" - + " --> KSTREAM-FILTER-0000000020\n" - + " <-- reduce-filter\n" - + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" - + " --> KSTREAM-SINK-0000000005\n" - + " <-- source-map\n" - + " Processor: KSTREAM-FILTER-0000000012 (stores: [])\n" - + " --> KSTREAM-SINK-0000000011\n" - + " <-- source-map\n" - + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" - + " --> KSTREAM-SINK-0000000019\n" - + " <-- reduce-peek\n" - + " Processor: KSTREAM-FILTER-0000000026 (stores: [])\n" - + " --> KSTREAM-SINK-0000000025\n" - + " <-- join-filter\n" - + " Processor: process-mapValues (stores: [])\n" - + " --> process\n" - + " <-- process-filter\n" - + " Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000006\n" - + " Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000012\n" - + " Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000020\n" - + " Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n" - + " <-- KSTREAM-FILTER-0000000026\n" - + " Processor: process (stores: [])\n" - + " --> none\n" - + " <-- process-mapValues\n\n"; + + " Sub-topology: 0\n" + + " Source: sourceStream (topics: [input])\n" + + " --> source-map\n" + + " Processor: source-map (stores: [])\n" + + " --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n" + + " <-- sourceStream\n" + + " Processor: reduce-filter (stores: [])\n" + + " --> reduce-peek\n" + + " <-- source-map\n" + + " Processor: join-filter (stores: [])\n" + + " --> KSTREAM-FILTER-0000000026\n" + + " <-- source-map\n" + + " Processor: process-filter (stores: [])\n" + + " --> process-mapValues\n" + + " <-- source-map\n" + + " Processor: reduce-peek (stores: [])\n" + + " --> KSTREAM-FILTER-0000000020\n" + + " <-- reduce-filter\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000012 (stores: [])\n" + + " --> KSTREAM-SINK-0000000011\n" + + " <-- source-map\n" + + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- reduce-peek\n" + + " Processor: KSTREAM-FILTER-0000000026 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- join-filter\n" + + " Processor: process-mapValues (stores: [])\n" + + " --> process\n" + + " <-- process-filter\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000012\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n" + + " <-- KSTREAM-FILTER-0000000026\n" + + " Processor: process (stores: [])\n" + + " --> none\n" + + " <-- process-mapValues\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n" + + " --> count\n" + + " Processor: count (stores: [count-store])\n" + + " --> count-toStream\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: count-toStream (stores: [])\n" + + " --> join-other-windowed, count-to\n" + + " <-- count\n" + + " Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n" + + " --> join-this-windowed\n" + + " Processor: join-other-windowed (stores: [other-join-store])\n" + + " --> join-other-join\n" + + " <-- count-toStream\n" + + " Processor: join-this-windowed (stores: [join-store])\n" + + " --> join-this-join\n" + + " <-- KSTREAM-SOURCE-0000000027\n" + + " Processor: join-other-join (stores: [join-store])\n" + + " --> join-merge\n" + + " <-- join-other-windowed\n" + + " Processor: join-this-join (stores: [other-join-store])\n" + + " --> join-merge\n" + + " <-- join-this-windowed\n" + + " Processor: join-merge (stores: [])\n" + + " --> join-to\n" + + " <-- join-this-join, join-other-join\n" + + " Sink: count-to (topic: outputTopic_0)\n" + + " <-- count-toStream\n" + + " Sink: join-to (topic: joinedOutputTopic)\n" + + " <-- join-merge\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n" + + " --> aggregate\n" + + " Processor: aggregate (stores: [aggregate-store])\n" + + " --> aggregate-toStream\n" + + " <-- KSTREAM-SOURCE-0000000013\n" + + " Processor: aggregate-toStream (stores: [])\n" + + " --> reduce-to\n" + + " <-- aggregate\n" + + " Sink: reduce-to (topic: outputTopic_1)\n" + + " <-- aggregate-toStream\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n" + + " --> reducer\n" + + " Processor: reducer (stores: [reduce-store])\n" + + " --> reduce-toStream\n" + + " <-- KSTREAM-SOURCE-0000000021\n" + + " Processor: reduce-toStream (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- reducer\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n" + + " <-- reduce-toStream\n\n"; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index 0d081f7..7e12abe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -211,6 +211,26 @@ public class RepartitionWithMergeOptimizingTest { private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" + " Sub-topology: 0\n" + + " Source: sourceAStream (topics: [inputA])\n" + + " --> mappedAStream\n" + + " Source: sourceBStream (topics: [inputB])\n" + + " --> mappedBStream\n" + + " Processor: mappedAStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceAStream\n" + + " Processor: mappedBStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceBStream\n" + + " Processor: mergedStream (stores: [])\n" + + " --> KSTREAM-FILTER-0000000019\n" + + " <-- mappedAStream, mappedBStream\n" + + " Processor: KSTREAM-FILTER-0000000019 (stores: [])\n" + + " --> KSTREAM-SINK-0000000018\n" + + " <-- mergedStream\n" + + " Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000019\n" + + "\n" + + " Sub-topology: 1\n" + " Source: KSTREAM-SOURCE-0000000020 (topics: [long-groupByKey-repartition])\n" + " --> long-count, string-count\n" + " Processor: string-count (stores: [string-store])\n" @@ -231,81 +251,60 @@ public class RepartitionWithMergeOptimizingTest { + " Sink: long-to (topic: outputTopic_0)\n" + " <-- long-toStream\n" + " Sink: string-to (topic: outputTopic_1)\n" - + " <-- string-mapValues\n" - + "\n" - + " Sub-topology: 1\n" - + " Source: sourceAStream (topics: [inputA])\n" - + " --> mappedAStream\n" - + " Source: sourceBStream (topics: [inputB])\n" - + " --> mappedBStream\n" - + " Processor: mappedAStream (stores: [])\n" - + " --> mergedStream\n" - + " <-- sourceAStream\n" - + " Processor: mappedBStream (stores: [])\n" - + " --> mergedStream\n" - + " <-- sourceBStream\n" - + " Processor: mergedStream (stores: [])\n" - + " --> KSTREAM-FILTER-0000000019\n" - + " <-- mappedAStream, mappedBStream\n" - + " Processor: KSTREAM-FILTER-0000000019 (stores: [])\n" - + " --> KSTREAM-SINK-0000000018\n" - + " <-- mergedStream\n" - + " Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000019\n\n"; + + " <-- string-mapValues\n\n"; private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" - + " Sub-topology: 0\n" - + " Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n" - + " --> long-count\n" - + " Processor: long-count (stores: [long-store])\n" - + " --> long-toStream\n" - + " <-- KSTREAM-SOURCE-0000000008\n" - + " Processor: long-toStream (stores: [])\n" - + " --> long-to\n" - + " <-- long-count\n" - + " Sink: long-to (topic: outputTopic_0)\n" - + " <-- long-toStream\n" - + "\n" - + " Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n" - + " --> string-count\n" - + " Processor: string-count (stores: [string-store])\n" - + " --> string-toStream\n" - + " <-- KSTREAM-SOURCE-0000000014\n" - + " Processor: string-toStream (stores: [])\n" - + " --> string-mapValues\n" - + " <-- string-count\n" - + " Processor: string-mapValues (stores: [])\n" - + " --> string-to\n" - + " <-- string-toStream\n" - + " Sink: string-to (topic: outputTopic_1)\n" - + " <-- string-mapValues\n" - + "\n" - + " Sub-topology: 2\n" - + " Source: sourceAStream (topics: [inputA])\n" - + " --> mappedAStream\n" - + " Source: sourceBStream (topics: [inputB])\n" - + " --> mappedBStream\n" - + " Processor: mappedAStream (stores: [])\n" - + " --> mergedStream\n" - + " <-- sourceAStream\n" - + " Processor: mappedBStream (stores: [])\n" - + " --> mergedStream\n" - + " <-- sourceBStream\n" - + " Processor: mergedStream (stores: [])\n" - + " --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n" - + " <-- mappedAStream, mappedBStream\n" - + " Processor: KSTREAM-FILTER-0000000007 (stores: [])\n" - + " --> KSTREAM-SINK-0000000006\n" - + " <-- mergedStream\n" - + " Processor: KSTREAM-FILTER-0000000013 (stores: [])\n" - + " --> KSTREAM-SINK-0000000012\n" - + " <-- mergedStream\n" - + " Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000007\n" - + " Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n" - + " <-- KSTREAM-FILTER-0000000013\n\n"; - + + " Sub-topology: 0\n" + + " Source: sourceAStream (topics: [inputA])\n" + + " --> mappedAStream\n" + + " Source: sourceBStream (topics: [inputB])\n" + + " --> mappedBStream\n" + + " Processor: mappedAStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceAStream\n" + + " Processor: mappedBStream (stores: [])\n" + + " --> mergedStream\n" + + " <-- sourceBStream\n" + + " Processor: mergedStream (stores: [])\n" + + " --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n" + + " <-- mappedAStream, mappedBStream\n" + + " Processor: KSTREAM-FILTER-0000000007 (stores: [])\n" + + " --> KSTREAM-SINK-0000000006\n" + + " <-- mergedStream\n" + + " Processor: KSTREAM-FILTER-0000000013 (stores: [])\n" + + " --> KSTREAM-SINK-0000000012\n" + + " <-- mergedStream\n" + + " Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000007\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n" + + " <-- KSTREAM-FILTER-0000000013\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n" + + " --> long-count\n" + + " Processor: long-count (stores: [long-store])\n" + + " --> long-toStream\n" + + " <-- KSTREAM-SOURCE-0000000008\n" + + " Processor: long-toStream (stores: [])\n" + + " --> long-to\n" + + " <-- long-count\n" + + " Sink: long-to (topic: outputTopic_0)\n" + + " <-- long-toStream\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n" + + " --> string-count\n" + + " Processor: string-count (stores: [string-store])\n" + + " --> string-toStream\n" + + " <-- KSTREAM-SOURCE-0000000014\n" + + " Processor: string-toStream (stores: [])\n" + + " --> string-mapValues\n" + + " <-- string-count\n" + + " Processor: string-mapValues (stores: [])\n" + + " --> string-to\n" + + " <-- string-toStream\n" + + " Sink: string-to (topic: outputTopic_1)\n" + + " <-- string-mapValues\n\n"; } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 43324ee..da1ba77 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -447,7 +447,7 @@ public class TopologyTestDriver implements Closeable { private void pipeRecord(final String topic, final Long timestamp, final byte[] key, final byte[] value, final Headers headers) { final String topicName = topic; - if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) { + if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) { validateSourceTopicNameRegexPattern(topic); } final TopicPartition topicPartition = getTopicPartition(topicName); @@ -495,7 +495,7 @@ public class TopologyTestDriver implements Closeable { private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) { - for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) { + for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) { if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) { throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName + " cannot contain regex pattern for input record topic: " + inputRecordTopic +