vpapavas commented on code in PR #12644:
URL: https://github.com/apache/kafka/pull/12644#discussion_r977470895


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,12 +383,55 @@ private void mergeDuplicateSourceNodes() {
         }
     }
 
-    private void optimizeKTableSourceTopics() {
+
+    /**
+     * The self-join rewriting can be applied if the StreamStreamJoinNode has 
a single parent.
+     * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+     * right argument of the join (the "other"). The join node may have 
multiple siblings but for
+     * this rewriting we only care about the ThisKStreamJoinWindow and the 
OtherKStreamJoinWindow.
+     * We iterate over all the siblings to identify these two nodes so that we 
can remove the
+     * latter.
+     */
+    @SuppressWarnings("unchecked")
+    private void rewriteSingleStoreSelfJoin(
+        final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+        visited.put(currentNode, true);
+        if (currentNode instanceof StreamStreamJoinNode && 
currentNode.parentNodes().size() == 1) {
+            final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) 
currentNode;
+            // Remove JoinOtherWindowed node
+            final GraphNode parent = 
joinNode.parentNodes().stream().findFirst().get();
+            GraphNode left = null, right = null;
+            for (final GraphNode child: parent.children()) {
+                if (child instanceof WindowedStreamProcessorNode

Review Comment:
   The current JoinNode might have other JoinNodes as siblings. We need to 
differentiate between the `WindowedStreamProcessorNode` nodes that belong the 
current node versus others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to