This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 68d6d99  KAFKA-9020: Streams sub-topologies should be sorted by sink 
-> source relationship (#7495)
68d6d99 is described below

commit 68d6d99f62541ea77855e153258b5158afdf2dee
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Mon Oct 14 13:15:03 2019 -0700

    KAFKA-9020: Streams sub-topologies should be sorted by sink -> source 
relationship (#7495)
    
    Subtopologies are currently ordered alphabetically by source node, which 
prior to KIP-307 happened to always result in the "correct" (ie topological) 
order. Now that users may name their nodes anything they want, we must 
explicitly order them so that upstream node groups/subtopologies come first and 
the downstream ones come after.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../internals/InternalTopologyBuilder.java         |  22 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |   8 +-
 .../kstream/RepartitionTopicNamingTest.java        |   2 +-
 .../internals/RepartitionOptimizingTest.java       | 347 +++++++++++----------
 .../RepartitionWithMergeOptimizingTest.java        | 145 +++++----
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 6 files changed, 259 insertions(+), 269 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d34ff00..5754b4a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -85,7 +84,7 @@ public class InternalTopologyBuilder {
     // map from source processor names to regex subscription patterns
     private final Map<String, Pattern> nodeToSourcePatterns = new 
LinkedHashMap<>();
 
-    // map from sink processor names to subscribed topic (without 
application-id prefix for internal topics)
+    // map from sink processor names to sink topic (without application-id 
prefix for internal topics)
     private final Map<String, String> nodeToSinkTopic = new HashMap<>();
 
     // map from topics to their matched regex patterns, this is to ensure one 
topic is passed through on source node
@@ -749,27 +748,18 @@ public class InternalTopologyBuilder {
         return nodeGroups;
     }
 
+    // Order node groups by their position in the actual topology, ie upstream 
subtopologies come before downstream
     private Map<Integer, Set<String>> makeNodeGroups() {
         final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
         final Map<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
 
-        // Go through source nodes first. This makes the group id assignment 
easy to predict in tests
-        final Set<String> allSourceNodes = new 
HashSet<>(nodeToSourceTopics.keySet());
-        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
-
-        for (final String nodeName : Utils.sorted(allSourceNodes)) {
+        // Traverse in topological order
+        for (final String nodeName : nodeFactories.keySet()) {
             nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, 
rootToNodeGroup);
         }
 
-        // Go through non-source nodes
-        for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToSourceTopics.containsKey(nodeName)) {
-                nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, 
nodeGroups, rootToNodeGroup);
-            }
-        }
-
         return nodeGroups;
     }
 
@@ -1905,11 +1895,11 @@ public class InternalTopologyBuilder {
 
     // following functions are for test only
 
-    public synchronized Set<String> getSourceTopicNames() {
+    public synchronized Set<String> sourceTopicNames() {
         return sourceTopicNames;
     }
 
-    public synchronized Map<String, StateStoreFactory> getStateStores() {
+    public synchronized Map<String, StateStoreFactory> stateStores() {
         return stateFactories;
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4118297..0cce24f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -385,10 +385,10 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.build().storeToChangelogTopic(),
             equalTo(Collections.singletonMap("store", "topic")));
         assertThat(
-            internalTopologyBuilder.getStateStores().keySet(),
+            internalTopologyBuilder.stateStores().keySet(),
             equalTo(Collections.singleton("store")));
         assertThat(
-            
internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(false));
         assertThat(
             
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
@@ -407,10 +407,10 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.build().storeToChangelogTopic(),
             equalTo(Collections.singletonMap("store", 
"appId-store-changelog")));
         assertThat(
-            internalTopologyBuilder.getStateStores().keySet(),
+            internalTopologyBuilder.stateStores().keySet(),
             equalTo(Collections.singleton("store")));
         assertThat(
-            
internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(true));
         assertThat(
             
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 0f3e33d..cf8d6b9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -603,7 +603,7 @@ public class RepartitionTopicNamingTest {
             "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
             "      --> KSTREAM-MAP-0000000001\n" +
             "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
-            "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, 
KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, 
KSTREAM-FILTER-0000000029\n" +
+            "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, 
KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000016, 
KSTREAM-FILTER-0000000029\n" +
             "      <-- KSTREAM-SOURCE-0000000000\n" +
             "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
             "      --> KSTREAM-PEEK-0000000021\n" +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 8425ad7..cf46af6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -279,182 +279,183 @@ public class RepartitionOptimizingTest {
     }
 
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                              + "   
Sub-topology: 0\n"
-                                                              + "    Source: 
KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n"
-                                                              + "      --> 
aggregate, count, join-filter, reduce-filter\n"
-                                                              + "    
Processor: count (stores: [count-store])\n"
-                                                              + "      --> 
count-toStream\n"
-                                                              + "      <-- 
KSTREAM-SOURCE-0000000036\n"
-                                                              + "    
Processor: count-toStream (stores: [])\n"
-                                                              + "      --> 
join-other-windowed, count-to\n"
-                                                              + "      <-- 
count\n"
-                                                              + "    
Processor: join-filter (stores: [])\n"
-                                                              + "      --> 
join-this-windowed\n"
-                                                              + "      <-- 
KSTREAM-SOURCE-0000000036\n"
-                                                              + "    
Processor: reduce-filter (stores: [])\n"
-                                                              + "      --> 
reduce-peek\n"
-                                                              + "      <-- 
KSTREAM-SOURCE-0000000036\n"
-                                                              + "    
Processor: join-other-windowed (stores: [other-join-store])\n"
-                                                              + "      --> 
join-other-join\n"
-                                                              + "      <-- 
count-toStream\n"
-                                                              + "    
Processor: join-this-windowed (stores: [join-store])\n"
-                                                              + "      --> 
join-this-join\n"
-                                                              + "      <-- 
join-filter\n"
-                                                              + "    
Processor: reduce-peek (stores: [])\n"
-                                                              + "      --> 
reducer\n"
-                                                              + "      <-- 
reduce-filter\n"
-                                                              + "    
Processor: aggregate (stores: [aggregate-store])\n"
-                                                              + "      --> 
aggregate-toStream\n"
-                                                              + "      <-- 
KSTREAM-SOURCE-0000000036\n"
-                                                              + "    
Processor: join-other-join (stores: [join-store])\n"
-                                                              + "      --> 
join-merge\n"
-                                                              + "      <-- 
join-other-windowed\n"
-                                                              + "    
Processor: join-this-join (stores: [other-join-store])\n"
-                                                              + "      --> 
join-merge\n"
-                                                              + "      <-- 
join-this-windowed\n"
-                                                              + "    
Processor: reducer (stores: [reduce-store])\n"
-                                                              + "      --> 
reduce-toStream\n"
-                                                              + "      <-- 
reduce-peek\n"
-                                                              + "    
Processor: aggregate-toStream (stores: [])\n"
-                                                              + "      --> 
reduce-to\n"
-                                                              + "      <-- 
aggregate\n"
-                                                              + "    
Processor: join-merge (stores: [])\n"
-                                                              + "      --> 
join-to\n"
-                                                              + "      <-- 
join-this-join, join-other-join\n"
-                                                              + "    
Processor: reduce-toStream (stores: [])\n"
-                                                              + "      --> 
KSTREAM-SINK-0000000023\n"
-                                                              + "      <-- 
reducer\n"
-                                                              + "    Sink: 
KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
-                                                              + "      <-- 
reduce-toStream\n"
-                                                              + "    Sink: 
count-to (topic: outputTopic_0)\n"
-                                                              + "      <-- 
count-toStream\n"
-                                                              + "    Sink: 
join-to (topic: joinedOutputTopic)\n"
-                                                              + "      <-- 
join-merge\n"
-                                                              + "    Sink: 
reduce-to (topic: outputTopic_1)\n"
-                                                              + "      <-- 
aggregate-toStream\n"
-                                                              + "\n"
-                                                              + "  
Sub-topology: 1\n"
-                                                              + "    Source: 
sourceStream (topics: [input])\n"
-                                                              + "      --> 
source-map\n"
-                                                              + "    
Processor: source-map (stores: [])\n"
-                                                              + "      --> 
process-filter, KSTREAM-FILTER-0000000035\n"
-                                                              + "      <-- 
sourceStream\n"
-                                                              + "    
Processor: process-filter (stores: [])\n"
-                                                              + "      --> 
process-mapValues\n"
-                                                              + "      <-- 
source-map\n"
-                                                              + "    
Processor: KSTREAM-FILTER-0000000035 (stores: [])\n"
-                                                              + "      --> 
KSTREAM-SINK-0000000034\n"
-                                                              + "      <-- 
source-map\n"
-                                                              + "    
Processor: process-mapValues (stores: [])\n"
-                                                              + "      --> 
process\n"
-                                                              + "      <-- 
process-filter\n"
-                                                              + "    Sink: 
KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n"
-                                                              + "      <-- 
KSTREAM-FILTER-0000000035\n"
-                                                              + "    
Processor: process (stores: [])\n"
-                                                              + "      --> 
none\n"
-                                                              + "      <-- 
process-mapValues\n\n";
+                                                                  + "   
Sub-topology: 0\n"
+                                                                  + "    
Source: sourceStream (topics: [input])\n"
+                                                                  + "      --> 
source-map\n"
+                                                                  + "    
Processor: source-map (stores: [])\n"
+                                                                  + "      --> 
process-filter, KSTREAM-FILTER-0000000035\n"
+                                                                  + "      <-- 
sourceStream\n"
+                                                                  + "    
Processor: process-filter (stores: [])\n"
+                                                                  + "      --> 
process-mapValues\n"
+                                                                  + "      <-- 
source-map\n"
+                                                                  + "    
Processor: KSTREAM-FILTER-0000000035 (stores: [])\n"
+                                                                  + "      --> 
KSTREAM-SINK-0000000034\n"
+                                                                  + "      <-- 
source-map\n"
+                                                                  + "    
Processor: process-mapValues (stores: [])\n"
+                                                                  + "      --> 
process\n"
+                                                                  + "      <-- 
process-filter\n"
+                                                                  + "    Sink: 
KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n"
+                                                                  + "      <-- 
KSTREAM-FILTER-0000000035\n"
+                                                                  + "    
Processor: process (stores: [])\n"
+                                                                  + "      --> 
none\n"
+                                                                  + "      <-- 
process-mapValues\n"
+                                                                  + "\n"
+                                                                  + "  
Sub-topology: 1\n"
+                                                                  + "    
Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n"
+                                                                  + "      --> 
aggregate, count, join-filter, reduce-filter\n"
+                                                                  + "    
Processor: count (stores: [count-store])\n"
+                                                                  + "      --> 
count-toStream\n"
+                                                                  + "      <-- 
KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    
Processor: count-toStream (stores: [])\n"
+                                                                  + "      --> 
join-other-windowed, count-to\n"
+                                                                  + "      <-- 
count\n"
+                                                                  + "    
Processor: join-filter (stores: [])\n"
+                                                                  + "      --> 
join-this-windowed\n"
+                                                                  + "      <-- 
KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    
Processor: reduce-filter (stores: [])\n"
+                                                                  + "      --> 
reduce-peek\n"
+                                                                  + "      <-- 
KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    
Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                                  + "      --> 
join-other-join\n"
+                                                                  + "      <-- 
count-toStream\n"
+                                                                  + "    
Processor: join-this-windowed (stores: [join-store])\n"
+                                                                  + "      --> 
join-this-join\n"
+                                                                  + "      <-- 
join-filter\n"
+                                                                  + "    
Processor: reduce-peek (stores: [])\n"
+                                                                  + "      --> 
reducer\n"
+                                                                  + "      <-- 
reduce-filter\n"
+                                                                  + "    
Processor: aggregate (stores: [aggregate-store])\n"
+                                                                  + "      --> 
aggregate-toStream\n"
+                                                                  + "      <-- 
KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    
Processor: join-other-join (stores: [join-store])\n"
+                                                                  + "      --> 
join-merge\n"
+                                                                  + "      <-- 
join-other-windowed\n"
+                                                                  + "    
Processor: join-this-join (stores: [other-join-store])\n"
+                                                                  + "      --> 
join-merge\n"
+                                                                  + "      <-- 
join-this-windowed\n"
+                                                                  + "    
Processor: reducer (stores: [reduce-store])\n"
+                                                                  + "      --> 
reduce-toStream\n"
+                                                                  + "      <-- 
reduce-peek\n"
+                                                                  + "    
Processor: aggregate-toStream (stores: [])\n"
+                                                                  + "      --> 
reduce-to\n"
+                                                                  + "      <-- 
aggregate\n"
+                                                                  + "    
Processor: join-merge (stores: [])\n"
+                                                                  + "      --> 
join-to\n"
+                                                                  + "      <-- 
join-this-join, join-other-join\n"
+                                                                  + "    
Processor: reduce-toStream (stores: [])\n"
+                                                                  + "      --> 
KSTREAM-SINK-0000000023\n"
+                                                                  + "      <-- 
reducer\n"
+                                                                  + "    Sink: 
KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                                  + "      <-- 
reduce-toStream\n"
+                                                                  + "    Sink: 
count-to (topic: outputTopic_0)\n"
+                                                                  + "      <-- 
count-toStream\n"
+                                                                  + "    Sink: 
join-to (topic: joinedOutputTopic)\n"
+                                                                  + "      <-- 
join-merge\n"
+                                                                  + "    Sink: 
reduce-to (topic: outputTopic_1)\n"
+                                                                  + "      <-- 
aggregate-toStream\n\n";
+
 
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                                + "   
Sub-topology: 0\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n"
-                                                                + "      --> 
count\n"
-                                                                + "    
Processor: count (stores: [count-store])\n"
-                                                                + "      --> 
count-toStream\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000007\n"
-                                                                + "    
Processor: count-toStream (stores: [])\n"
-                                                                + "      --> 
join-other-windowed, count-to\n"
-                                                                + "      <-- 
count\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n"
-                                                                + "      --> 
join-this-windowed\n"
-                                                                + "    
Processor: join-other-windowed (stores: [other-join-store])\n"
-                                                                + "      --> 
join-other-join\n"
-                                                                + "      <-- 
count-toStream\n"
-                                                                + "    
Processor: join-this-windowed (stores: [join-store])\n"
-                                                                + "      --> 
join-this-join\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000027\n"
-                                                                + "    
Processor: join-other-join (stores: [join-store])\n"
-                                                                + "      --> 
join-merge\n"
-                                                                + "      <-- 
join-other-windowed\n"
-                                                                + "    
Processor: join-this-join (stores: [other-join-store])\n"
-                                                                + "      --> 
join-merge\n"
-                                                                + "      <-- 
join-this-windowed\n"
-                                                                + "    
Processor: join-merge (stores: [])\n"
-                                                                + "      --> 
join-to\n"
-                                                                + "      <-- 
join-this-join, join-other-join\n"
-                                                                + "    Sink: 
count-to (topic: outputTopic_0)\n"
-                                                                + "      <-- 
count-toStream\n"
-                                                                + "    Sink: 
join-to (topic: joinedOutputTopic)\n"
-                                                                + "      <-- 
join-merge\n"
-                                                                + "\n"
-                                                                + "  
Sub-topology: 1\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n"
-                                                                + "      --> 
aggregate\n"
-                                                                + "    
Processor: aggregate (stores: [aggregate-store])\n"
-                                                                + "      --> 
aggregate-toStream\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000013\n"
-                                                                + "    
Processor: aggregate-toStream (stores: [])\n"
-                                                                + "      --> 
reduce-to\n"
-                                                                + "      <-- 
aggregate\n"
-                                                                + "    Sink: 
reduce-to (topic: outputTopic_1)\n"
-                                                                + "      <-- 
aggregate-toStream\n"
-                                                                + "\n"
-                                                                + "  
Sub-topology: 2\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n"
-                                                                + "      --> 
reducer\n"
-                                                                + "    
Processor: reducer (stores: [reduce-store])\n"
-                                                                + "      --> 
reduce-toStream\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000021\n"
-                                                                + "    
Processor: reduce-toStream (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000023\n"
-                                                                + "      <-- 
reducer\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
-                                                                + "      <-- 
reduce-toStream\n"
-                                                                + "\n"
-                                                                + "  
Sub-topology: 3\n"
-                                                                + "    Source: 
sourceStream (topics: [input])\n"
-                                                                + "      --> 
source-map\n"
-                                                                + "    
Processor: source-map (stores: [])\n"
-                                                                + "      --> 
reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, 
KSTREAM-FILTER-0000000012\n"
-                                                                + "      <-- 
sourceStream\n"
-                                                                + "    
Processor: reduce-filter (stores: [])\n"
-                                                                + "      --> 
reduce-peek\n"
-                                                                + "      <-- 
source-map\n"
-                                                                + "    
Processor: join-filter (stores: [])\n"
-                                                                + "      --> 
KSTREAM-FILTER-0000000026\n"
-                                                                + "      <-- 
source-map\n"
-                                                                + "    
Processor: process-filter (stores: [])\n"
-                                                                + "      --> 
process-mapValues\n"
-                                                                + "      <-- 
source-map\n"
-                                                                + "    
Processor: reduce-peek (stores: [])\n"
-                                                                + "      --> 
KSTREAM-FILTER-0000000020\n"
-                                                                + "      <-- 
reduce-filter\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000006 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000005\n"
-                                                                + "      <-- 
source-map\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000012 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000011\n"
-                                                                + "      <-- 
source-map\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000019\n"
-                                                                + "      <-- 
reduce-peek\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000026 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000025\n"
-                                                                + "      <-- 
join-filter\n"
-                                                                + "    
Processor: process-mapValues (stores: [])\n"
-                                                                + "      --> 
process\n"
-                                                                + "      <-- 
process-filter\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000006\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000012\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000020\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000026\n"
-                                                                + "    
Processor: process (stores: [])\n"
-                                                                + "      --> 
none\n"
-                                                                + "      <-- 
process-mapValues\n\n";
+                                                                    + "   
Sub-topology: 0\n"
+                                                                    + "    
Source: sourceStream (topics: [input])\n"
+                                                                    + "      
--> source-map\n"
+                                                                    + "    
Processor: source-map (stores: [])\n"
+                                                                    + "      
--> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, 
KSTREAM-FILTER-0000000012\n"
+                                                                    + "      
<-- sourceStream\n"
+                                                                    + "    
Processor: reduce-filter (stores: [])\n"
+                                                                    + "      
--> reduce-peek\n"
+                                                                    + "      
<-- source-map\n"
+                                                                    + "    
Processor: join-filter (stores: [])\n"
+                                                                    + "      
--> KSTREAM-FILTER-0000000026\n"
+                                                                    + "      
<-- source-map\n"
+                                                                    + "    
Processor: process-filter (stores: [])\n"
+                                                                    + "      
--> process-mapValues\n"
+                                                                    + "      
<-- source-map\n"
+                                                                    + "    
Processor: reduce-peek (stores: [])\n"
+                                                                    + "      
--> KSTREAM-FILTER-0000000020\n"
+                                                                    + "      
<-- reduce-filter\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000006 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000005\n"
+                                                                    + "      
<-- source-map\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000012 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000011\n"
+                                                                    + "      
<-- source-map\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000019\n"
+                                                                    + "      
<-- reduce-peek\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000026 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000025\n"
+                                                                    + "      
<-- join-filter\n"
+                                                                    + "    
Processor: process-mapValues (stores: [])\n"
+                                                                    + "      
--> process\n"
+                                                                    + "      
<-- process-filter\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000006\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000012\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000020\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000026\n"
+                                                                    + "    
Processor: process (stores: [])\n"
+                                                                    + "      
--> none\n"
+                                                                    + "      
<-- process-mapValues\n"
+                                                                    + "\n"
+                                                                    + "  
Sub-topology: 1\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n"
+                                                                    + "      
--> count\n"
+                                                                    + "    
Processor: count (stores: [count-store])\n"
+                                                                    + "      
--> count-toStream\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000007\n"
+                                                                    + "    
Processor: count-toStream (stores: [])\n"
+                                                                    + "      
--> join-other-windowed, count-to\n"
+                                                                    + "      
<-- count\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n"
+                                                                    + "      
--> join-this-windowed\n"
+                                                                    + "    
Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                                    + "      
--> join-other-join\n"
+                                                                    + "      
<-- count-toStream\n"
+                                                                    + "    
Processor: join-this-windowed (stores: [join-store])\n"
+                                                                    + "      
--> join-this-join\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000027\n"
+                                                                    + "    
Processor: join-other-join (stores: [join-store])\n"
+                                                                    + "      
--> join-merge\n"
+                                                                    + "      
<-- join-other-windowed\n"
+                                                                    + "    
Processor: join-this-join (stores: [other-join-store])\n"
+                                                                    + "      
--> join-merge\n"
+                                                                    + "      
<-- join-this-windowed\n"
+                                                                    + "    
Processor: join-merge (stores: [])\n"
+                                                                    + "      
--> join-to\n"
+                                                                    + "      
<-- join-this-join, join-other-join\n"
+                                                                    + "    
Sink: count-to (topic: outputTopic_0)\n"
+                                                                    + "      
<-- count-toStream\n"
+                                                                    + "    
Sink: join-to (topic: joinedOutputTopic)\n"
+                                                                    + "      
<-- join-merge\n"
+                                                                    + "\n"
+                                                                    + "  
Sub-topology: 2\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000013 (topics: 
[aggregate-groupByKey-repartition])\n"
+                                                                    + "      
--> aggregate\n"
+                                                                    + "    
Processor: aggregate (stores: [aggregate-store])\n"
+                                                                    + "      
--> aggregate-toStream\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000013\n"
+                                                                    + "    
Processor: aggregate-toStream (stores: [])\n"
+                                                                    + "      
--> reduce-to\n"
+                                                                    + "      
<-- aggregate\n"
+                                                                    + "    
Sink: reduce-to (topic: outputTopic_1)\n"
+                                                                    + "      
<-- aggregate-toStream\n"
+                                                                    + "\n"
+                                                                    + "  
Sub-topology: 3\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n"
+                                                                    + "      
--> reducer\n"
+                                                                    + "    
Processor: reducer (stores: [reduce-store])\n"
+                                                                    + "      
--> reduce-toStream\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000021\n"
+                                                                    + "    
Processor: reduce-toStream (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000023\n"
+                                                                    + "      
<-- reducer\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                                    + "      
<-- reduce-toStream\n\n";
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index 0d081f7..7e12abe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -211,6 +211,26 @@ public class RepartitionWithMergeOptimizingTest {
 
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                               + "   
Sub-topology: 0\n"
+                                                              + "    Source: 
sourceAStream (topics: [inputA])\n"
+                                                              + "      --> 
mappedAStream\n"
+                                                              + "    Source: 
sourceBStream (topics: [inputB])\n"
+                                                              + "      --> 
mappedBStream\n"
+                                                              + "    
Processor: mappedAStream (stores: [])\n"
+                                                              + "      --> 
mergedStream\n"
+                                                              + "      <-- 
sourceAStream\n"
+                                                              + "    
Processor: mappedBStream (stores: [])\n"
+                                                              + "      --> 
mergedStream\n"
+                                                              + "      <-- 
sourceBStream\n"
+                                                              + "    
Processor: mergedStream (stores: [])\n"
+                                                              + "      --> 
KSTREAM-FILTER-0000000019\n"
+                                                              + "      <-- 
mappedAStream, mappedBStream\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000019 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000018\n"
+                                                              + "      <-- 
mergedStream\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000019\n"
+                                                              + "\n"
+                                                              + "  
Sub-topology: 1\n"
                                                               + "    Source: 
KSTREAM-SOURCE-0000000020 (topics: [long-groupByKey-repartition])\n"
                                                               + "      --> 
long-count, string-count\n"
                                                               + "    
Processor: string-count (stores: [string-store])\n"
@@ -231,81 +251,60 @@ public class RepartitionWithMergeOptimizingTest {
                                                               + "    Sink: 
long-to (topic: outputTopic_0)\n"
                                                               + "      <-- 
long-toStream\n"
                                                               + "    Sink: 
string-to (topic: outputTopic_1)\n"
-                                                              + "      <-- 
string-mapValues\n"
-                                                              + "\n"
-                                                              + "  
Sub-topology: 1\n"
-                                                              + "    Source: 
sourceAStream (topics: [inputA])\n"
-                                                              + "      --> 
mappedAStream\n"
-                                                              + "    Source: 
sourceBStream (topics: [inputB])\n"
-                                                              + "      --> 
mappedBStream\n"
-                                                              + "    
Processor: mappedAStream (stores: [])\n"
-                                                              + "      --> 
mergedStream\n"
-                                                              + "      <-- 
sourceAStream\n"
-                                                              + "    
Processor: mappedBStream (stores: [])\n"
-                                                              + "      --> 
mergedStream\n"
-                                                              + "      <-- 
sourceBStream\n"
-                                                              + "    
Processor: mergedStream (stores: [])\n"
-                                                              + "      --> 
KSTREAM-FILTER-0000000019\n"
-                                                              + "      <-- 
mappedAStream, mappedBStream\n"
-                                                              + "    
Processor: KSTREAM-FILTER-0000000019 (stores: [])\n"
-                                                              + "      --> 
KSTREAM-SINK-0000000018\n"
-                                                              + "      <-- 
mergedStream\n"
-                                                              + "    Sink: 
KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n"
-                                                              + "      <-- 
KSTREAM-FILTER-0000000019\n\n";
+                                                              + "      <-- 
string-mapValues\n\n";
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                                + "   
Sub-topology: 0\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n"
-                                                                + "      --> 
long-count\n"
-                                                                + "    
Processor: long-count (stores: [long-store])\n"
-                                                                + "      --> 
long-toStream\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000008\n"
-                                                                + "    
Processor: long-toStream (stores: [])\n"
-                                                                + "      --> 
long-to\n"
-                                                                + "      <-- 
long-count\n"
-                                                                + "    Sink: 
long-to (topic: outputTopic_0)\n"
-                                                                + "      <-- 
long-toStream\n"
-                                                                + "\n"
-                                                                + "  
Sub-topology: 1\n"
-                                                                + "    Source: 
KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n"
-                                                                + "      --> 
string-count\n"
-                                                                + "    
Processor: string-count (stores: [string-store])\n"
-                                                                + "      --> 
string-toStream\n"
-                                                                + "      <-- 
KSTREAM-SOURCE-0000000014\n"
-                                                                + "    
Processor: string-toStream (stores: [])\n"
-                                                                + "      --> 
string-mapValues\n"
-                                                                + "      <-- 
string-count\n"
-                                                                + "    
Processor: string-mapValues (stores: [])\n"
-                                                                + "      --> 
string-to\n"
-                                                                + "      <-- 
string-toStream\n"
-                                                                + "    Sink: 
string-to (topic: outputTopic_1)\n"
-                                                                + "      <-- 
string-mapValues\n"
-                                                                + "\n"
-                                                                + "  
Sub-topology: 2\n"
-                                                                + "    Source: 
sourceAStream (topics: [inputA])\n"
-                                                                + "      --> 
mappedAStream\n"
-                                                                + "    Source: 
sourceBStream (topics: [inputB])\n"
-                                                                + "      --> 
mappedBStream\n"
-                                                                + "    
Processor: mappedAStream (stores: [])\n"
-                                                                + "      --> 
mergedStream\n"
-                                                                + "      <-- 
sourceAStream\n"
-                                                                + "    
Processor: mappedBStream (stores: [])\n"
-                                                                + "      --> 
mergedStream\n"
-                                                                + "      <-- 
sourceBStream\n"
-                                                                + "    
Processor: mergedStream (stores: [])\n"
-                                                                + "      --> 
KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n"
-                                                                + "      <-- 
mappedAStream, mappedBStream\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000006\n"
-                                                                + "      <-- 
mergedStream\n"
-                                                                + "    
Processor: KSTREAM-FILTER-0000000013 (stores: [])\n"
-                                                                + "      --> 
KSTREAM-SINK-0000000012\n"
-                                                                + "      <-- 
mergedStream\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000007\n"
-                                                                + "    Sink: 
KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n"
-                                                                + "      <-- 
KSTREAM-FILTER-0000000013\n\n";
-
+                                                                    + "   
Sub-topology: 0\n"
+                                                                    + "    
Source: sourceAStream (topics: [inputA])\n"
+                                                                    + "      
--> mappedAStream\n"
+                                                                    + "    
Source: sourceBStream (topics: [inputB])\n"
+                                                                    + "      
--> mappedBStream\n"
+                                                                    + "    
Processor: mappedAStream (stores: [])\n"
+                                                                    + "      
--> mergedStream\n"
+                                                                    + "      
<-- sourceAStream\n"
+                                                                    + "    
Processor: mappedBStream (stores: [])\n"
+                                                                    + "      
--> mergedStream\n"
+                                                                    + "      
<-- sourceBStream\n"
+                                                                    + "    
Processor: mergedStream (stores: [])\n"
+                                                                    + "      
--> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n"
+                                                                    + "      
<-- mappedAStream, mappedBStream\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000006\n"
+                                                                    + "      
<-- mergedStream\n"
+                                                                    + "    
Processor: KSTREAM-FILTER-0000000013 (stores: [])\n"
+                                                                    + "      
--> KSTREAM-SINK-0000000012\n"
+                                                                    + "      
<-- mergedStream\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000007\n"
+                                                                    + "    
Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n"
+                                                                    + "      
<-- KSTREAM-FILTER-0000000013\n"
+                                                                    + "\n"
+                                                                    + "  
Sub-topology: 1\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n"
+                                                                    + "      
--> long-count\n"
+                                                                    + "    
Processor: long-count (stores: [long-store])\n"
+                                                                    + "      
--> long-toStream\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000008\n"
+                                                                    + "    
Processor: long-toStream (stores: [])\n"
+                                                                    + "      
--> long-to\n"
+                                                                    + "      
<-- long-count\n"
+                                                                    + "    
Sink: long-to (topic: outputTopic_0)\n"
+                                                                    + "      
<-- long-toStream\n"
+                                                                    + "\n"
+                                                                    + "  
Sub-topology: 2\n"
+                                                                    + "    
Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n"
+                                                                    + "      
--> string-count\n"
+                                                                    + "    
Processor: string-count (stores: [string-store])\n"
+                                                                    + "      
--> string-toStream\n"
+                                                                    + "      
<-- KSTREAM-SOURCE-0000000014\n"
+                                                                    + "    
Processor: string-toStream (stores: [])\n"
+                                                                    + "      
--> string-mapValues\n"
+                                                                    + "      
<-- string-count\n"
+                                                                    + "    
Processor: string-mapValues (stores: [])\n"
+                                                                    + "      
--> string-to\n"
+                                                                    + "      
<-- string-toStream\n"
+                                                                    + "    
Sink: string-to (topic: outputTopic_1)\n"
+                                                                    + "      
<-- string-mapValues\n\n";
 
 }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 43324ee..da1ba77 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -447,7 +447,7 @@ public class TopologyTestDriver implements Closeable {
     private void pipeRecord(final String topic, final Long timestamp, final 
byte[] key, final byte[] value, final Headers headers) {
         final String topicName = topic;
 
-        if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) {
+        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
             validateSourceTopicNameRegexPattern(topic);
         }
         final TopicPartition topicPartition = getTopicPartition(topicName);
@@ -495,7 +495,7 @@ public class TopologyTestDriver implements Closeable {
 
 
     private void validateSourceTopicNameRegexPattern(final String 
inputRecordTopic) {
-        for (final String sourceTopicName : 
internalTopologyBuilder.getSourceTopicNames()) {
+        for (final String sourceTopicName : 
internalTopologyBuilder.sourceTopicNames()) {
             if (!sourceTopicName.equals(inputRecordTopic) && 
Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
                 throw new TopologyException("Topology add source of type 
String for topic: " + sourceTopicName +
                         " cannot contain regex pattern for input record topic: 
" + inputRecordTopic +

Reply via email to