vpapavas commented on code in PR #12644: URL: https://github.com/apache/kafka/pull/12644#discussion_r977470895
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,12 +383,55 @@ private void mergeDuplicateSourceNodes() { } } - private void optimizeKTableSourceTopics() { + + /** + * The self-join rewriting can be applied if the StreamStreamJoinNode has a single parent. + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + * right argument of the join (the "other"). The join node may have multiple siblings but for + * this rewriting we only care about the ThisKStreamJoinWindow and the OtherKStreamJoinWindow. + * We iterate over all the siblings to identify these two nodes so that we can remove the + * latter. + */ + @SuppressWarnings("unchecked") + private void rewriteSingleStoreSelfJoin( + final GraphNode currentNode, final Map<GraphNode, Boolean> visited) { + visited.put(currentNode, true); + if (currentNode instanceof StreamStreamJoinNode && currentNode.parentNodes().size() == 1) { + final StreamStreamJoinNode joinNode = (StreamStreamJoinNode) currentNode; + // Remove JoinOtherWindowed node + final GraphNode parent = joinNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof WindowedStreamProcessorNode Review Comment: The current JoinNode might have other JoinNodes as siblings. We need to differentiate between the `WindowedStreamProcessorNode` nodes that belong the current node versus others. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org