agavra commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959892207


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(
+        final AtomicInteger count,
+        final GraphNode currentNode,
+        final Set<GraphNode> visited) {
+
+        if (currentNode instanceof StreamSourceNode) {
+            count.incrementAndGet();
+        }
+
+        for (final GraphNode parent: currentNode.parentNodes()) {
+            if (!visited.contains(parent)) {

Review Comment:
   `GraphNode` doesn't implement `equals` and `hashCode` which makes this 
operation somewhat risky. Either we should implement those methods or 
explicitly use `IdentityHashMap` if we're certain that the nodes will always be 
the same instance



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {
+            for (final GraphNode sibling : parent.children()) {
+                if (sibling instanceof ProcessorGraphNode) {
+                    if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+                        continue;
+                    }
+                }
+                if (sibling != streamJoinNode
+                    && sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void countSourceNodes(

Review Comment:
   instead of incrementing `count` can we have this method return `int`? (you 
can then sum up recursive call results) - generally better to avoid 
side-effects as it makes debugging easier
   
   alternatively, we can return a `boolean` and change this to 
`hasExactlyOneSourceNode`, which would return false if more than one of the 
recursive calls returned `true` (also allowing us to short-circuit the logic, 
not that the performance boost matters)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {
+            return false;
+        }
+        for (final GraphNode parent: streamJoinNode.parentNodes()) {

Review Comment:
   instead of making this a for loop, can we use `Iterables.getOnlyElement()`? 
that should make this a bit easier to reason about



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   this questions is because I'm not familiar with the codebase, but can you 
explain what build priority is and why it matters in this algorithm? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();

Review Comment:
   nit: rename this to numSourceNodes for readability (or if you take my 
suggestion below, just inline the call in the `if` branch)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, 
V1, K, VOut> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+    private final String windowName;
+    private final long joinBeforeMs;
+    private final long joinAfterMs;
+    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joinerThis;
+    private final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? 
extends VOut> joinerOther;
+
+    private final TimeTracker sharedTimeTracker;
+
+    KStreamKStreamSelfJoin(
+        final String windowName,
+        final JoinWindowsInternal windows,
+        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends 
VOut> joinerThis,
+        final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends 
VOut> joinerOther,
+        final TimeTracker sharedTimeTracker) {
+
+        this.windowName = windowName;
+        this.joinBeforeMs = windows.beforeMs;
+        this.joinAfterMs = windows.afterMs;
+        this.joinerThis = joinerThis;
+        this.joinerOther = joinerOther;
+        this.sharedTimeTracker = sharedTimeTracker;
+    }
+
+    @Override
+    public Processor<K, V1, K, VOut> get() {
+        return new KStreamKStreamSelfJoinProcessor();
+    }
+
+    private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor<K, V1, K, VOut> {
+        private WindowStore<K, V2> windowStore;
+        private Sensor droppedRecordsSensor;
+
+        @Override
+        public void init(final ProcessorContext<K, VOut> context) {
+            super.init(context);
+
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+            windowStore = context.getStateStore(windowName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void process(final Record<K, V1> record) {
+            // Copied from inner join:

Review Comment:
   let's try to avoid duplicated code - without having looked at the code in 
inner join, is there anything that we can do to put this in a parent class? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the

Review Comment:
   nit: incomplete doc?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(final Properties props) {
+        // Vicky: Do we need to verify props?
+        final List<String> optimizationConfigs;
+        if (props == null) {
+            optimizationConfigs = new ArrayList<>();
+            optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+        } else {
+            final StreamsConfig config = new StreamsConfig(props);
+            optimizationConfigs = 
config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG);
+        }
 
         mergeDuplicateSourceNodes();
-        if (optimizeTopology) {
-            LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+        if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)

Review Comment:
   nit: probably easier to read if we just have one top level branch for 
`contains(OPTIMIZE)`:
   ```
   if (contains(OPTIMIZE)) {
     applyOptimizations()
   }
   
   void applyOptimizations() {
     if (contains(REUSE) reuse
     if (contains(MERGE) merge...
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {

Review Comment:
   are we sure that there's at most two nodes in `parent.children()`? if so we 
should assert that here and not use a `for` loop but instead get indexes 
explicitly



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);
+            }
+        }
+    }
+
+    /**
+     * The self-join rewriting can be applied if:
+     * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+     * 2. The StreamStreamJoinNode has a single parent.
+     * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+     * StreamStreamJoinNode and have smaller build priority.
+     */
+    private boolean isSelfJoin(final GraphNode streamJoinNode) {
+        final AtomicInteger count = new AtomicInteger();
+        countSourceNodes(count, streamJoinNode, new HashSet<>());
+        if (count.get() > 1) {
+            return false;
+        }
+        if (streamJoinNode.parentNodes().size() > 1) {

Review Comment:
   should this check be `!= 1`? There should never be a case here where `<1` is 
true, right?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {
+                rewriteSelfJoin(child, visited);

Review Comment:
   for my understanding, is it possible to have multiple self joins in a single 
topology? I thought from offline discussions that this wasn't possible - in 
which case should we cascade and exit the recursion once we've identified a 
single self join?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
+    /**
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSelfJoin(final GraphNode currentNode, final 
Set<GraphNode> visited) {
+        visited.add(currentNode);
+        if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+            ((StreamStreamJoinNode) currentNode).setSelfJoin();
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof  ProcessorGraphNode
+                    && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+                    if (left == null) {
+                        left = child;
+                    } else {
+                        right = child;
+                    }
+                }
+            }
+            // Sanity check
+            if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+                parent.removeChild(right);
+            }
+        }
+        for (final GraphNode child: currentNode.children()) {
+            if (!visited.contains(child)) {

Review Comment:
   same comment about `GraphNode#equals` as below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to