[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580587#comment-16580587
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---------------------------------------

guozhangwang closed pull request #5451: KAFKA-6761: Reduce streams footprint 
part IV add optimization
URL: https://github.com/apache/kafka/pull/5451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 0442e2bbef2..4c9ee932a1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -530,8 +530,7 @@ public synchronized Topology build() {
      * @return the {@link Topology} that represents the specified processing 
logic
      */
     public synchronized Topology build(final Properties props) {
-        // the props instance will be used once optimization framework merged
-        internalStreamsBuilder.buildAndOptimizeTopology();
+        internalStreamsBuilder.buildAndOptimizeTopology(props);
         return topology;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 889559144d8..e5cd0663472 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
@@ -35,13 +39,18 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 public class InternalStreamsBuilder implements InternalNameProvider {
@@ -49,8 +58,9 @@
     final InternalTopologyBuilder internalTopologyBuilder;
     private final AtomicInteger index = new AtomicInteger(0);
 
-    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
-    private final NodeIdComparator nodeIdComparator = new NodeIdComparator();
+    private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
+    private final Map<StreamsGraphNode, Set<OptimizableRepartitionNode>> 
keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>();
+    private final Set<StreamsGraphNode> mergeNodes = new HashSet<>();
 
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = 
LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -205,14 +215,17 @@ public synchronized void addGlobalStore(final 
StoreBuilder<KeyValueStore> storeB
                        stateUpdateSupplier);
     }
 
-    void addGraphNode(final StreamsGraphNode parent, final StreamsGraphNode 
child) {
+    void addGraphNode(final StreamsGraphNode parent,
+                      final StreamsGraphNode child) {
         Objects.requireNonNull(parent, "parent node can't be null");
         Objects.requireNonNull(child, "child node can't be null");
-        parent.addChildNode(child);
+        parent.addChild(child);
         maybeAddNodeForOptimizationMetadata(child);
     }
 
-    void addGraphNode(final Collection<StreamsGraphNode> parents, final 
StreamsGraphNode child) {
+
+    void addGraphNode(final Collection<StreamsGraphNode> parents,
+                      final StreamsGraphNode child) {
         Objects.requireNonNull(parents, "parent node can't be null");
         Objects.requireNonNull(child, "child node can't be null");
 
@@ -225,13 +238,37 @@ void addGraphNode(final Collection<StreamsGraphNode> 
parents, final StreamsGraph
         }
     }
 
-    void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
-        node.setId(nodeIdCounter.getAndIncrement());
+    private void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode 
node) {
+        node.setBuildPriority(buildPriorityIndex.getAndIncrement());
+
+        if (node.parentNodes().isEmpty() && 
!node.nodeName().equals(TOPOLOGY_ROOT)) {
+            throw new IllegalStateException(
+                "Nodes should not have a null parent node.  Name: " + 
node.nodeName() + " Type: "
+                + node.getClass().getSimpleName());
+        }
+
+        if (node.isKeyChangingOperation()) {
+            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new 
HashSet<>());
+        } else if (node instanceof OptimizableRepartitionNode) {
+            final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
+            if (parentNode != null) {
+                
keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode)
 node);
+            }
+        } else if (node.isMergeNode()) {
+            mergeNodes.add(node);
+        }
     }
 
+    // use this method for testing only
     public void buildAndOptimizeTopology() {
+        buildAndOptimizeTopology(null);
+    }
+
+    public void buildAndOptimizeTopology(final Properties props) {
+
+        maybePerformOptimizations(props);
 
-        final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new 
PriorityQueue<>(5, nodeIdComparator);
+        final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new 
PriorityQueue<>(5, Comparator.comparing(StreamsGraphNode::buildPriority));
 
         graphNodePriorityQueue.offer(root);
 
@@ -253,17 +290,168 @@ public void buildAndOptimizeTopology() {
         }
     }
 
+    private void maybePerformOptimizations(final Properties props) {
 
-    public StreamsGraphNode root() {
-        return root;
+        if (props != null && 
props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE))
 {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+            maybeOptimizeRepartitionOperations();
+        }
     }
 
-    private static class NodeIdComparator implements 
Comparator<StreamsGraphNode>, Serializable {
+    @SuppressWarnings("unchecked")
+    private void maybeOptimizeRepartitionOperations() {
+        maybeUpdateKeyChangingRepartitionNodeMap();
 
-        @Override
-        public int compare(final StreamsGraphNode o1,
-                           final StreamsGraphNode o2) {
-            return o1.id().compareTo(o2.id());
+        for (final Map.Entry<StreamsGraphNode, 
Set<OptimizableRepartitionNode>> entry : 
keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+
+            final StreamsGraphNode keyChangingNode = entry.getKey();
+
+            if (entry.getValue().isEmpty()) {
+                continue;
+            }
+
+            final SerializedInternal serialized = new 
SerializedInternal(getRepartitionSerdes(entry.getValue()));
+
+            final StreamsGraphNode optimizedSingleRepartition = 
createRepartitionNode(keyChangingNode.nodeName(),
+                                                                               
       serialized.keySerde(),
+                                                                               
       serialized.valueSerde());
+
+            // re-use parent buildPriority to make sure the single repartition 
graph node is evaluated before downstream nodes
+            
optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
+
+            for (final OptimizableRepartitionNode repartitionNodeToBeReplaced 
: entry.getValue()) {
+
+                final StreamsGraphNode keyChangingNodeChild = 
findParentNodeMatching(repartitionNodeToBeReplaced, gn -> 
gn.parentNodes().contains(keyChangingNode));
+
+                if (keyChangingNodeChild == null) {
+                    throw new StreamsException(String.format("Found a null 
keyChangingChild node for %s", repartitionNodeToBeReplaced));
+                }
+
+                LOG.debug("Found the child node of the key changer {} from the 
repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced);
+
+                // need to add children of key-changing node as children of 
optimized repartition
+                // in order to process records from re-partitioning
+                optimizedSingleRepartition.addChild(keyChangingNodeChild);
+
+                LOG.debug("Removing {} from {}  children {}", 
keyChangingNodeChild, keyChangingNode, keyChangingNode.children());
+                // now remove children from key-changing node
+                keyChangingNode.removeChild(keyChangingNodeChild);
+
+                // now need to get children of repartition node so we can 
remove repartition node
+                final Collection<StreamsGraphNode> 
repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children();
+                final Collection<StreamsGraphNode> 
parentsOfRepartitionNodeToBeReplaced = 
repartitionNodeToBeReplaced.parentNodes();
+
+                for (final StreamsGraphNode repartitionNodeToBeReplacedChild : 
repartitionNodeToBeReplacedChildren) {
+                    for (final StreamsGraphNode parentNode : 
parentsOfRepartitionNodeToBeReplaced) {
+                        parentNode.addChild(repartitionNodeToBeReplacedChild);
+                    }
+                }
+
+                for (final StreamsGraphNode parentNode : 
parentsOfRepartitionNodeToBeReplaced) {
+                    parentNode.removeChild(repartitionNodeToBeReplaced);
+                }
+                repartitionNodeToBeReplaced.clearChildren();
+
+                LOG.debug("Updated node {} children {}", 
optimizedSingleRepartition, optimizedSingleRepartition.children());
+            }
+
+            keyChangingNode.addChild(optimizedSingleRepartition);
+            
keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+        }
+    }
+
+    private void maybeUpdateKeyChangingRepartitionNodeMap() {
+        final Map<StreamsGraphNode, Set<StreamsGraphNode>> 
mergeNodesToKeyChangers = new HashMap<>();
+        for (final StreamsGraphNode mergeNode : mergeNodes) {
+            mergeNodesToKeyChangers.put(mergeNode, new HashSet<>());
+            final Collection<StreamsGraphNode> keys = 
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
+            for (final StreamsGraphNode key : keys) {
+                final StreamsGraphNode maybeParentKey = 
findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
+                if (maybeParentKey != null) {
+                    mergeNodesToKeyChangers.get(mergeNode).add(key);
+                }
+            }
         }
+
+        for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : 
mergeNodesToKeyChangers.entrySet()) {
+            final StreamsGraphNode mergeKey = entry.getKey();
+            final Collection<StreamsGraphNode> keyChangingParents = 
entry.getValue();
+            final Set<OptimizableRepartitionNode> repartitionNodes = new 
HashSet<>();
+            for (final StreamsGraphNode keyChangingParent : 
keyChangingParents) {
+                
repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
+                
keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
+            }
+
+            keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, 
repartitionNodes);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private OptimizableRepartitionNode createRepartitionNode(final String name,
+                                                             final Serde 
keySerde,
+                                                             final Serde 
valueSerde) {
+
+        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder 
repartitionNodeBuilder = 
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        KStreamImpl.createRepartitionedSource(this,
+                                              keySerde,
+                                              valueSerde,
+                                              name + "-optimized",
+                                              name,
+                                              repartitionNodeBuilder);
+
+        return repartitionNodeBuilder.build();
+
+    }
+
+    private StreamsGraphNode getKeyChangingParentNode(final StreamsGraphNode 
repartitionNode) {
+        final StreamsGraphNode shouldBeKeyChangingNode = 
findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || 
n.isValueChangingOperation());
+
+        final StreamsGraphNode keyChangingNode = 
findParentNodeMatching(repartitionNode, 
StreamsGraphNode::isKeyChangingOperation);
+        if (shouldBeKeyChangingNode != null && 
shouldBeKeyChangingNode.equals(keyChangingNode)) {
+            return keyChangingNode;
+        }
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private SerializedInternal getRepartitionSerdes(final 
Collection<OptimizableRepartitionNode> repartitionNodes) {
+        Serde keySerde = null;
+        Serde valueSerde = null;
+
+        for (final OptimizableRepartitionNode repartitionNode : 
repartitionNodes) {
+            if (keySerde == null && repartitionNode.keySerde() != null) {
+                keySerde = repartitionNode.keySerde();
+            }
+
+            if (valueSerde == null && repartitionNode.valueSerde() != null) {
+                valueSerde = repartitionNode.valueSerde();
+            }
+
+            if (keySerde != null && valueSerde != null) {
+                break;
+            }
+        }
+
+        return new SerializedInternal(Serialized.with(keySerde, valueSerde));
+    }
+
+    private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode 
startSeekingNode,
+                                                    final 
Predicate<StreamsGraphNode> parentNodePredicate) {
+        if (parentNodePredicate.test(startSeekingNode)) {
+            return startSeekingNode;
+        }
+        StreamsGraphNode foundParentNode = null;
+
+        for (final StreamsGraphNode parentNode : 
startSeekingNode.parentNodes()) {
+            if (parentNodePredicate.test(parentNode)) {
+                return parentNode;
+            }
+            foundParentNode = findParentNodeMatching(parentNode, 
parentNodePredicate);
+        }
+        return foundParentNode;
+    }
+
+    public StreamsGraphNode root() {
+        return root;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index becb03db24c..9b8afcd744b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -219,6 +219,7 @@
         final ProcessorGraphNode<? super  K, ? super V> mapValuesProcessorNode 
= new ProcessorGraphNode<>(name,
                                                                                
                          processorParameters,
                                                                                
                          repartitionRequired);
+        mapValuesProcessorNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, mapValuesProcessorNode);
@@ -274,6 +275,7 @@ public void print(final Printed<K, V> printed) {
         final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new 
ProcessorGraphNode<>(name,
                                                                                
                     processorParameters,
                                                                                
                     repartitionRequired);
+        flatMapValuesNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, flatMapValuesNode);
@@ -343,7 +345,7 @@ public void print(final Printed<K, V> printed) {
                                                                                
             processorParameters,
                                                                                
             requireRepartitioning);
 
-
+        mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, 
streamImpl.streamsGraphNode), mergeNode);
         return new KStreamImpl<>(builder, name, allSourceNodes, 
requireRepartitioning, mergeNode);
     }
@@ -491,6 +493,7 @@ private void to(final TopicNameExtractor<K, V> 
topicExtractor, final ProducedInt
                                                                                
        stateStoreNames,
                                                                                
        null,
                                                                                
        repartitionRequired);
+        transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, transformNode);
@@ -529,7 +532,7 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
                       joiner,
                       windows,
                       joined,
-                      new KStreamImplJoin(false, false, 
this.streamsGraphNode));
+                      new KStreamImplJoin(false, false));
 
     }
 
@@ -545,7 +548,7 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
                                              final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
                                              final JoinWindows windows,
                                              final Joined<K, V, VO> joined) {
-        return doJoin(other, joiner, windows, joined, new 
KStreamImplJoin(true, true, this.streamsGraphNode));
+        return doJoin(other, joiner, windows, joined, new 
KStreamImplJoin(true, true));
     }
 
     private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
@@ -656,7 +659,7 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
             joiner,
             windows,
             joined,
-            new KStreamImplJoin(true, false, this.streamsGraphNode)
+            new KStreamImplJoin(true, false)
         );
 
     }
@@ -840,14 +843,12 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
         private final boolean leftOuter;
         private final boolean rightOuter;
-        private final StreamsGraphNode parentGraphNode;
+
 
         KStreamImplJoin(final boolean leftOuter,
-                        final boolean rightOuter,
-                        final StreamsGraphNode parentGraphNode) {
+                        final boolean rightOuter) {
             this.leftOuter = leftOuter;
             this.rightOuter = rightOuter;
-            this.parentGraphNode = parentGraphNode;
         }
 
         @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index 7198df09829..4d81b1f699c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -46,6 +46,14 @@
 
     }
 
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
     @Override
     Serializer<V> getValueSerializer() {
         return valueSerde != null ? valueSerde.serializer() : null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
index 902a4e97e2a..fac5923f3f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
@@ -31,7 +31,9 @@
     private final String nodeName;
     private final boolean repartitionRequired;
     private boolean keyChangingOperation;
-    private Integer id;
+    private boolean valueChangingOperation;
+    private boolean mergeNode;
+    private Integer buildPriority;
     private boolean hasWrittenToTopology = false;
 
     public StreamsGraphNode(final String nodeName,
@@ -44,7 +46,7 @@ public StreamsGraphNode(final String nodeName,
         return parentNodes;
     }
 
-    public String[] parentNodeNames() {
+    String[] parentNodeNames() {
         final String[] parentNames = new String[parentNodes.size()];
         int index = 0;
         for (final StreamsGraphNode parentNode : parentNodes) {
@@ -62,17 +64,24 @@ public boolean allParentsWrittenToTopology() {
         return true;
     }
 
-    public void addParentNode(final StreamsGraphNode parentNode) {
-        parentNodes.add(parentNode);
-    }
-
     public Collection<StreamsGraphNode> children() {
         return new LinkedHashSet<>(childNodes);
     }
 
-    public void addChildNode(final StreamsGraphNode childNode) {
+    public void clearChildren() {
+        for (final StreamsGraphNode childNode : childNodes) {
+            childNode.parentNodes.remove(this);
+        }
+        childNodes.clear();
+    }
+
+    public boolean removeChild(final StreamsGraphNode child) {
+        return childNodes.remove(child) && child.parentNodes.remove(this);
+    }
+
+    public void addChild(final StreamsGraphNode childNode) {
         this.childNodes.add(childNode);
-        childNode.addParentNode(this);
+        childNode.parentNodes.add(this);
     }
 
     public String nodeName() {
@@ -87,16 +96,32 @@ public boolean isKeyChangingOperation() {
         return keyChangingOperation;
     }
 
+    public boolean isValueChangingOperation() {
+        return valueChangingOperation;
+    }
+
+    public boolean isMergeNode() {
+        return mergeNode;
+    }
+
+    public void setMergeNode(final boolean mergeNode) {
+        this.mergeNode = mergeNode;
+    }
+
+    public void setValueChangingOperation(final boolean 
valueChangingOperation) {
+        this.valueChangingOperation = valueChangingOperation;
+    }
+
     public void keyChangingOperation(final boolean keyChangingOperation) {
         this.keyChangingOperation = keyChangingOperation;
     }
 
-    public void setId(final int id) {
-        this.id = id;
+    public void setBuildPriority(final int buildPriority) {
+        this.buildPriority = buildPriority;
     }
 
-    public Integer id() {
-        return this.id;
+    public Integer buildPriority() {
+        return this.buildPriority;
     }
 
     public abstract void writeToTopology(final InternalTopologyBuilder 
topologyBuilder);
@@ -114,7 +139,7 @@ public String toString() {
         final String[] parentNames = parentNodeNames();
         return "StreamsGraphNode{" +
                "nodeName='" + nodeName + '\'' +
-               ", id=" + id +
+               ", buildPriority=" + buildPriority +
                " parentNodes=" + Arrays.toString(parentNames) + '}';
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
new file mode 100644
index 00000000000..e192c70b81b
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import kafka.utils.MockTime;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@Category({IntegrationTest.class})
+public class RepartitionOptimizingIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_TOPIC = "input";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String AGGREGATION_TOPIC = "outputTopic_1";
+    private static final String REDUCE_TOPIC = "outputTopic_2";
+    private static final String JOINED_TOPIC = "joinedOutputTopic";
+
+    private static final int ONE_REPARTITION_TOPIC = 1;
+    private static final int FOUR_REPARTITION_TOPICS = 4;
+
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+
+    private Properties streamsConfiguration;
+
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
+
+    @Before
+    public void setUp() throws Exception {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+            "maybe-optimized-test-app",
+            CLUSTER.bootstrapServers(),
+            Serdes.String().getClass().getName(),
+            Serdes.String().getClass().getName(),
+            props);
+
+        CLUSTER.createTopics(INPUT_TOPIC,
+                             COUNT_TOPIC,
+                             AGGREGATION_TOPIC,
+                             REDUCE_TOPIC,
+                             JOINED_TOPIC);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    @Test
+    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
+        runIntegrationTest(StreamsConfig.OPTIMIZE,
+                           ONE_REPARTITION_TOPIC);
+    }
+
+    @Test
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
+        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
+                           FOUR_REPARTITION_TOPICS);
+    }
+
+
+    private void runIntegrationTest(final String optimizationConfig,
+                                    final int expectedNumberRepartitionTopics) 
throws Exception {
+
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> 
agg + v.length();
+
+        final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
+
+        final List<String> processorValueCollector = new ArrayList<>();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = 
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = sourceStream.map((k, v) 
-> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v));
+
+        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> 
v.toUpperCase(Locale.getDefault()))
+            .process(() -> new SimpleProcessor(processorValueCollector));
+
+        final KStream<String, Long> countStream = 
mappedStream.groupByKey().count(Materialized.with(Serdes.String(), 
Serdes.Long())).toStream();
+
+        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), 
Serdes.Long()));
+
+        mappedStream.groupByKey().aggregate(initializer,
+                                            aggregator,
+                                            Materialized.with(Serdes.String(), 
Serdes.Integer()))
+            .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), 
Serdes.Integer()));
+
+        // adding operators for case where the repartition node is further 
downstream
+        mappedStream.filter((k, v) -> true).peek((k, v) -> 
System.out.println(k + ":" + v)).groupByKey()
+            .reduce(reducer, Materialized.with(Serdes.String(), 
Serdes.String()))
+            .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), 
Serdes.String()));
+
+        mappedStream.filter((k, v) -> k.equals("A"))
+            .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
+                  JoinWindows.of(5000),
+                  Joined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
+            .to(JOINED_TOPIC);
+
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
optimizationConfig);
+
+        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, 
getKeyValues(), producerConfig, mockTime);
+
+        final Properties consumerConfig1 = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+        final Properties consumerConfig2 = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
IntegerDeserializer.class);
+        final Properties consumerConfig3 = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
StringDeserializer.class);
+
+        final Topology topology = builder.build(streamsConfiguration);
+        final String topologyString = topology.describe().toString();
+
+        if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
+            assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
+        } else {
+            assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
+        }
+
+
+        /*
+           confirming number of expected repartition topics here
+         */
+        assertEquals(expectedNumberRepartitionTopics, 
getCountOfRepartitionTopicsFound(topologyString));
+
+        final KafkaStreams streams = new KafkaStreams(topology, 
streamsConfiguration);
+        streams.start();
+
+        final List<KeyValue<String, Long>> expectedCountKeyValues = 
Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), 
KeyValue.pair("C", 3L));
+        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues.size());
+
+        final List<KeyValue<String, Integer>> expectedAggKeyValues = 
Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 
9));
+        final List<KeyValue<String, Integer>> receivedAggKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, 
AGGREGATION_TOPIC, expectedAggKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedReduceKeyValues = 
Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", 
"foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
+        final List<KeyValue<String, Integer>> receivedReduceKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, 
REDUCE_TOPIC, expectedAggKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedJoinKeyValues = 
Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), 
KeyValue.pair("A", "baz:3"));
+        final List<KeyValue<String, Integer>> receivedJoinKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, 
JOINED_TOPIC, expectedJoinKeyValues.size());
+
+
+        final List<String> expectedCollectedProcessorValues = 
Arrays.asList("FOO", "BAR", "BAZ");
+
+        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
+        assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues));
+        assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues));
+        assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues));
+
+        assertThat(3, equalTo(processorValueCollector.size()));
+        assertThat(processorValueCollector, 
equalTo(expectedCollectedProcessorValues));
+
+        streams.close(5, TimeUnit.SECONDS);
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = 
repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private List<KeyValue<String, String>> getKeyValues() {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+        final String[] keys = new String[]{"a", "b", "c"};
+        final String[] values = new String[]{"foo", "bar", "baz"};
+        for (final String key : keys) {
+            for (final String value : values) {
+                keyValueList.add(KeyValue.pair(key, value));
+            }
+        }
+        return keyValueList;
+    }
+
+
+    private static class SimpleProcessor extends AbstractProcessor<String, 
String> {
+
+        final List<String> valueList;
+
+        SimpleProcessor(final List<String> valueList) {
+            this.valueList = valueList;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            valueList.add(value);
+        }
+    }
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                              + "   
Sub-topology: 0\n"
+                                                              + "    Source: 
KSTREAM-SOURCE-0000000000 (topics: [input])\n"
+                                                              + "      --> 
KSTREAM-MAP-0000000001\n"
+                                                              + "    
Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000000\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-MAPVALUES-0000000003\n"
+                                                              + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000039\n"
+                                                              + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                              + "    
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-PROCESSOR-0000000004\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000002\n"
+                                                              + "    
Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
+                                                              + "      --> 
none\n"
+                                                              + "      <-- 
KSTREAM-MAPVALUES-0000000003\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000039 (topic: KSTREAM-MAP-0000000001-optimized-repartition)\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000040\n"
+                                                              + "\n"
+                                                              + "  
Sub-topology: 1\n"
+                                                              + "    Source: 
KSTREAM-SOURCE-0000000041 (topics: 
[KSTREAM-MAP-0000000001-optimized-repartition])\n"
+                                                              + "      --> 
KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, 
KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n"
+                                                              + "    
Processor: KSTREAM-AGGREGATE-0000000007 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
+                                                              + "      --> 
KTABLE-TOSTREAM-0000000011\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000041\n"
+                                                              + "    
Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
+                                                              + "      <-- 
KSTREAM-AGGREGATE-0000000007\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-PEEK-0000000021\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000041\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-WINDOWED-0000000033\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000041\n"
+                                                              + "    
Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-REDUCE-0000000023\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000020\n"
+                                                              + "    
Processor: KSTREAM-WINDOWED-0000000033 (stores: 
[KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                              + "      --> 
KSTREAM-JOINTHIS-0000000035\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000029\n"
+                                                              + "    
Processor: KSTREAM-WINDOWED-0000000034 (stores: 
[KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                              + "      --> 
KSTREAM-JOINOTHER-0000000036\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000011\n"
+                                                              + "    
Processor: KSTREAM-AGGREGATE-0000000014 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
+                                                              + "      --> 
KTABLE-TOSTREAM-0000000018\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000041\n"
+                                                              + "    
Processor: KSTREAM-JOINOTHER-0000000036 (stores: 
[KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                              + "      --> 
KSTREAM-MERGE-0000000037\n"
+                                                              + "      <-- 
KSTREAM-WINDOWED-0000000034\n"
+                                                              + "    
Processor: KSTREAM-JOINTHIS-0000000035 (stores: 
[KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                              + "      --> 
KSTREAM-MERGE-0000000037\n"
+                                                              + "      <-- 
KSTREAM-WINDOWED-0000000033\n"
+                                                              + "    
Processor: KSTREAM-REDUCE-0000000023 (stores: 
[KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
+                                                              + "      --> 
KTABLE-TOSTREAM-0000000027\n"
+                                                              + "      <-- 
KSTREAM-PEEK-0000000021\n"
+                                                              + "    
Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000038\n"
+                                                              + "      <-- 
KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
+                                                              + "    
Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000019\n"
+                                                              + "      <-- 
KSTREAM-AGGREGATE-0000000014\n"
+                                                              + "    
Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000028\n"
+                                                              + "      <-- 
KSTREAM-REDUCE-0000000023\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000011\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000018\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000027\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
+                                                              + "      <-- 
KSTREAM-MERGE-0000000037\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                                + "   
Sub-topology: 0\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000000 (topics: [input])\n"
+                                                                + "      --> 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, 
KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, 
KSTREAM-FILTER-0000000029\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000000\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-PEEK-0000000021\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-MAPVALUES-0000000003\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-FILTER-0000000031\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-FILTER-0000000025\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000020\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000008\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000016 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000015\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000001\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000025 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000024\n"
+                                                                + "      <-- 
KSTREAM-PEEK-0000000021\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000031 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000030\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000029\n"
+                                                                + "    
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-PROCESSOR-0000000004\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000002\n"
+                                                                + "    
Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
+                                                                + "      --> 
none\n"
+                                                                + "      <-- 
KSTREAM-MAPVALUES-0000000003\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000008 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000009\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000015 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000016\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000024 (topic: 
KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000025\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000031\n"
+                                                                + "\n"
+                                                                + "  
Sub-topology: 1\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000010 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
+                                                                + "      --> 
KSTREAM-AGGREGATE-0000000007\n"
+                                                                + "    
Processor: KSTREAM-AGGREGATE-0000000007 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
+                                                                + "      --> 
KTABLE-TOSTREAM-0000000011\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000010\n"
+                                                                + "    
Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
+                                                                + "      <-- 
KSTREAM-AGGREGATE-0000000007\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n"
+                                                                + "      --> 
KSTREAM-WINDOWED-0000000033\n"
+                                                                + "    
Processor: KSTREAM-WINDOWED-0000000033 (stores: 
[KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                                + "      --> 
KSTREAM-JOINTHIS-0000000035\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000032\n"
+                                                                + "    
Processor: KSTREAM-WINDOWED-0000000034 (stores: 
[KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                                + "      --> 
KSTREAM-JOINOTHER-0000000036\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000011\n"
+                                                                + "    
Processor: KSTREAM-JOINOTHER-0000000036 (stores: 
[KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                                + "      --> 
KSTREAM-MERGE-0000000037\n"
+                                                                + "      <-- 
KSTREAM-WINDOWED-0000000034\n"
+                                                                + "    
Processor: KSTREAM-JOINTHIS-0000000035 (stores: 
[KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                                + "      --> 
KSTREAM-MERGE-0000000037\n"
+                                                                + "      <-- 
KSTREAM-WINDOWED-0000000033\n"
+                                                                + "    
Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000038\n"
+                                                                + "      <-- 
KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000011\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
+                                                                + "      <-- 
KSTREAM-MERGE-0000000037\n"
+                                                                + "\n"
+                                                                + "  
Sub-topology: 2\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000017 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n"
+                                                                + "      --> 
KSTREAM-AGGREGATE-0000000014\n"
+                                                                + "    
Processor: KSTREAM-AGGREGATE-0000000014 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
+                                                                + "      --> 
KTABLE-TOSTREAM-0000000018\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000017\n"
+                                                                + "    
Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000019\n"
+                                                                + "      <-- 
KSTREAM-AGGREGATE-0000000014\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000018\n"
+                                                                + "\n"
+                                                                + "  
Sub-topology: 3\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000026 (topics: 
[KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n"
+                                                                + "      --> 
KSTREAM-REDUCE-0000000023\n"
+                                                                + "    
Processor: KSTREAM-REDUCE-0000000023 (stores: 
[KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
+                                                                + "      --> 
KTABLE-TOSTREAM-0000000027\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000026\n"
+                                                                + "    
Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000028\n"
+                                                                + "      <-- 
KSTREAM-REDUCE-0000000023\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000027\n\n";
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
new file mode 100644
index 00000000000..200062e0fb9
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import kafka.utils.MockTime;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@Category({IntegrationTest.class})
+public class RepartitionWithMergeOptimizingIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_A_TOPIC = "inputA";
+    private static final String INPUT_B_TOPIC = "inputB";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String COUNT_STRING_TOPIC = "outputTopic_1";
+
+
+    private static final int ONE_REPARTITION_TOPIC = 1;
+    private static final int TWO_REPARTITION_TOPICS = 2;
+
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+
+    private Properties streamsConfiguration;
+
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
+
+    @Before
+    public void setUp() throws Exception {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+            "maybe-optimized-with-merge-test-app",
+            CLUSTER.bootstrapServers(),
+            Serdes.String().getClass().getName(),
+            Serdes.String().getClass().getName(),
+            props);
+
+        CLUSTER.createTopics(COUNT_TOPIC,
+                             COUNT_STRING_TOPIC,
+                             INPUT_A_TOPIC,
+                             INPUT_B_TOPIC);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    @Test
+    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
+        runIntegrationTest(StreamsConfig.OPTIMIZE,
+                           ONE_REPARTITION_TOPIC);
+    }
+
+    @Test
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
+        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
+                           TWO_REPARTITION_TOPICS);
+    }
+
+
+    private void runIntegrationTest(final String optimizationConfig,
+                                    final int expectedNumberRepartitionTopics) 
throws Exception {
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceAStream = 
builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> sourceBStream = 
builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedAStream = sourceAStream.map((k, v) 
-> KeyValue.pair(v.split(":")[0], v));
+        final KStream<String, String> mappedBStream = sourceBStream.map((k, v) 
-> KeyValue.pair(v.split(":")[0], v));
+
+        final KStream<String, String> mergedStream = 
mappedAStream.merge(mappedBStream);
+
+        mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, 
Produced.with(Serdes.String(), Serdes.Long()));
+        mergedStream.groupByKey().count().toStream().mapValues(v -> 
v.toString()).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), 
Serdes.String()));
+
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
optimizationConfig);
+
+        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, 
getKeyValues(), producerConfig, mockTime);
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, 
getKeyValues(), producerConfig, mockTime);
+
+        final Properties consumerConfig1 = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
LongDeserializer.class);
+        final Properties consumerConfig2 = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
StringDeserializer.class);
+
+        final Topology topology = builder.build(streamsConfiguration);
+        final String topologyString = topology.describe().toString();
+        System.out.println(topologyString);
+
+        if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
+            assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
+        } else {
+            assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
+        }
+
+
+        /*
+           confirming number of expected repartition topics here
+         */
+        assertEquals(expectedNumberRepartitionTopics, 
getCountOfRepartitionTopicsFound(topologyString));
+
+        final KafkaStreams streams = new KafkaStreams(topology, 
streamsConfiguration);
+        streams.start();
+
+        final List<KeyValue<String, Long>> expectedCountKeyValues = 
Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), 
KeyValue.pair("C", 6L));
+        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedStringCountKeyValues = 
Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), 
KeyValue.pair("C", "6"));
+        final List<KeyValue<String, String>> receivedCountStringKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, 
COUNT_STRING_TOPIC, expectedStringCountKeyValues.size());
+
+        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
+        assertThat(receivedCountStringKeyValues, 
equalTo(expectedStringCountKeyValues));
+
+        streams.close(5, TimeUnit.SECONDS);
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = 
repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private List<KeyValue<String, String>> getKeyValues() {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+        final String[] keys = new String[]{"X", "Y", "Z"};
+        final String[] values = new String[]{"A:foo", "B:foo", "C:foo"};
+        for (final String key : keys) {
+            for (final String value : values) {
+                keyValueList.add(KeyValue.pair(key, value));
+            }
+        }
+        return keyValueList;
+    }
+
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                              + "   
Sub-topology: 0\n"
+                                                              + "    Source: 
KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
+                                                              + "      --> 
KSTREAM-MAP-0000000002\n"
+                                                              + "    Source: 
KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
+                                                              + "      --> 
KSTREAM-MAP-0000000003\n"
+                                                              + "    
Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-MERGE-0000000004\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000000\n"
+                                                              + "    
Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-MERGE-0000000004\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000001\n"
+                                                              + "    
Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-FILTER-0000000021\n"
+                                                              + "      <-- 
KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
+                                                              + "    
Processor: KSTREAM-FILTER-0000000021 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000020\n"
+                                                              + "      <-- 
KSTREAM-MERGE-0000000004\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000020 (topic: 
KSTREAM-MERGE-0000000004-optimized-repartition)\n"
+                                                              + "      <-- 
KSTREAM-FILTER-0000000021\n"
+                                                              + "\n"
+                                                              + "  
Sub-topology: 1\n"
+                                                              + "    Source: 
KSTREAM-SOURCE-0000000022 (topics: 
[KSTREAM-MERGE-0000000004-optimized-repartition])\n"
+                                                              + "      --> 
KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n"
+                                                              + "    
Processor: KSTREAM-AGGREGATE-0000000013 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
+                                                              + "      --> 
KTABLE-TOSTREAM-0000000017\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000022\n"
+                                                              + "    
Processor: KSTREAM-AGGREGATE-0000000006 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
+                                                              + "      --> 
KTABLE-TOSTREAM-0000000010\n"
+                                                              + "      <-- 
KSTREAM-SOURCE-0000000022\n"
+                                                              + "    
Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-MAPVALUES-0000000018\n"
+                                                              + "      <-- 
KSTREAM-AGGREGATE-0000000013\n"
+                                                              + "    
Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000019\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000017\n"
+                                                              + "    
Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
+                                                              + "      --> 
KSTREAM-SINK-0000000011\n"
+                                                              + "      <-- 
KSTREAM-AGGREGATE-0000000006\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
+                                                              + "      <-- 
KTABLE-TOSTREAM-0000000010\n"
+                                                              + "    Sink: 
KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                              + "      <-- 
KSTREAM-MAPVALUES-0000000018\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                                + "   
Sub-topology: 0\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
+                                                                + "      --> 
KSTREAM-MAP-0000000002\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
+                                                                + "      --> 
KSTREAM-MAP-0000000003\n"
+                                                                + "    
Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-MERGE-0000000004\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000000\n"
+                                                                + "    
Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-MERGE-0000000004\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000001\n"
+                                                                + "    
Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n"
+                                                                + "      <-- 
KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000008 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000007\n"
+                                                                + "      <-- 
KSTREAM-MERGE-0000000004\n"
+                                                                + "    
Processor: KSTREAM-FILTER-0000000015 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000014\n"
+                                                                + "      <-- 
KSTREAM-MERGE-0000000004\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000007 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000008\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000014 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n"
+                                                                + "      <-- 
KSTREAM-FILTER-0000000015\n"
+                                                                + "\n"
+                                                                + "  
Sub-topology: 1\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000009 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
+                                                                + "      --> 
KSTREAM-AGGREGATE-0000000006\n"
+                                                                + "    
Processor: KSTREAM-AGGREGATE-0000000006 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
+                                                                + "      --> 
KTABLE-TOSTREAM-0000000010\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000009\n"
+                                                                + "    
Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000011\n"
+                                                                + "      <-- 
KSTREAM-AGGREGATE-0000000006\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000010\n"
+                                                                + "\n"
+                                                                + "  
Sub-topology: 2\n"
+                                                                + "    Source: 
KSTREAM-SOURCE-0000000016 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n"
+                                                                + "      --> 
KSTREAM-AGGREGATE-0000000013\n"
+                                                                + "    
Processor: KSTREAM-AGGREGATE-0000000013 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
+                                                                + "      --> 
KTABLE-TOSTREAM-0000000017\n"
+                                                                + "      <-- 
KSTREAM-SOURCE-0000000016\n"
+                                                                + "    
Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-MAPVALUES-0000000018\n"
+                                                                + "      <-- 
KSTREAM-AGGREGATE-0000000013\n"
+                                                                + "    
Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
+                                                                + "      --> 
KSTREAM-SINK-0000000019\n"
+                                                                + "      <-- 
KTABLE-TOSTREAM-0000000017\n"
+                                                                + "    Sink: 
KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                                + "      <-- 
KSTREAM-MAPVALUES-0000000018\n\n";
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 2bddf4110e0..d65f27e2b19 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -18,15 +18,121 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import static org.junit.Assert.assertEquals;
 
 public class StreamsGraphTest {
+
+    final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
     
+
+
+    // Test builds topology in succesive manner but only graph node not yet 
processed written to topology
+
+    @Test
+    public void shouldBeAbleToBuildTopologyIncrementally() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> stream = builder.stream("topic");
+        final KStream<String, String> streamII = builder.stream("other-topic");
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + 
v2;
+
+
+        final KStream<String, String> joinedStream = stream.join(streamII, 
valueJoiner, JoinWindows.of(5000));
+
+        // build step one
+        assertEquals(expectedJoinedTopology, 
builder.build().describe().toString());
+
+        final KStream<String, String> filteredJoinStream = 
joinedStream.filter((k, v) -> v.equals("foo"));
+        // build step two
+        assertEquals(expectedJoinedFilteredTopology, 
builder.build().describe().toString());
+
+        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
+        // build step three
+        assertEquals(expectedFullTopology, 
builder.build().describe().toString());
+
+    }
+
+    @Test
+    public void 
shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
+
+        final Topology attemptedOptimize = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
+        final Topology noOptimization = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);
+
+        assertEquals(attemptedOptimize.describe().toString(), 
noOptimization.describe().toString());
+        assertEquals(2, 
getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(2, 
getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+    }
+
+    // no need to optimize as user has already performed the repartitioning 
manually
+    @Test
+    public void shouldNotOptimizeWhenAThroughOperationIsDone() {
+
+        final Topology attemptedOptimize = 
getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
+        final Topology noOptimziation = 
getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);
+
+        assertEquals(attemptedOptimize.describe().toString(), 
noOptimziation.describe().toString());
+        assertEquals(0, 
getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(0, 
getCountOfRepartitionTopicsFound(noOptimziation.describe().toString()));
+
+    }
+
+    private Topology getTopologyWithChangingValuesAfterChangingKey(final 
String optimizeConfig) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+
+        final KStream<String, String> inputStream = builder.stream("input");
+        final KStream<String, String> mappedKeyStream = 
inputStream.selectKey((k, v) -> k + v);
+
+        mappedKeyStream.mapValues(v -> 
v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
+        mappedKeyStream.flatMapValues(v -> 
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output");
+
+        return builder.build(properties);
+
+    }
+
+    private Topology getTopologyWithThroughOperation(final String 
optimizeConfig) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+
+        final KStream<String, String> inputStream = builder.stream("input");
+        final KStream<String, String> mappedKeyStream = 
inputStream.selectKey((k, v) -> k + v).through("through-topic");
+
+        mappedKeyStream.groupByKey().count().toStream().to("output");
+        
mappedKeyStream.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output");
+
+        return builder.build(properties);
+
+    }
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = 
repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
     private String expectedJoinedTopology = "Topologies:\n"
                                             + "   Sub-topology: 0\n"
                                             + "    Source: 
KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
@@ -104,31 +210,4 @@
                                           + "    Sink: KSTREAM-SINK-0000000009 
(topic: output-topic)\n"
                                           + "      <-- 
KSTREAM-MAPVALUES-0000000008\n\n";
 
-    // Test builds topology in succesive manner but only graph node not yet 
processed written to topology
-
-    @Test
-    public void shouldBeAbleToBuildTopologyIncrementally() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<String, String> stream = builder.stream("topic");
-        final KStream<String, String> streamII = builder.stream("other-topic");
-        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + 
v2;
-
-
-        final KStream<String, String> joinedStream = stream.join(streamII, 
valueJoiner, JoinWindows.of(5000));
-
-        // build step one
-        assertEquals(expectedJoinedTopology, 
builder.build().describe().toString());
-
-        final KStream<String, String> filteredJoinStream = 
joinedStream.filter((k, v) -> v.equals("foo"));
-        // build step two
-        assertEquals(expectedJoinedFilteredTopology, 
builder.build().describe().toString());
-
-        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
-        // build step three
-        assertEquals(expectedFullTopology, 
builder.build().describe().toString());
-
-
-    }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> ------------------------------
>
>                 Key: KAFKA-6761
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6761
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to