agavra commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r959892207
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ + private boolean isSelfJoin(final GraphNode streamJoinNode) { + final AtomicInteger count = new AtomicInteger(); + countSourceNodes(count, streamJoinNode, new HashSet<>()); + if (count.get() > 1) { + return false; + } + if (streamJoinNode.parentNodes().size() > 1) { + return false; + } + for (final GraphNode parent: streamJoinNode.parentNodes()) { + for (final GraphNode sibling : parent.children()) { + if (sibling instanceof ProcessorGraphNode) { + if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { + continue; + } + } + if (sibling != streamJoinNode + && sibling.buildPriority() < streamJoinNode.buildPriority()) { + return false; + } + } + } + return true; + } + + private void countSourceNodes( + final AtomicInteger count, + final GraphNode currentNode, + final Set<GraphNode> visited) { + + if (currentNode instanceof StreamSourceNode) { + count.incrementAndGet(); + } + + for (final GraphNode parent: currentNode.parentNodes()) { + if (!visited.contains(parent)) { Review Comment: `GraphNode` doesn't implement `equals` and `hashCode` which makes this operation somewhat risky. Either we should implement those methods or explicitly use `IdentityHashMap` if we're certain that the nodes will always be the same instance ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ + private boolean isSelfJoin(final GraphNode streamJoinNode) { + final AtomicInteger count = new AtomicInteger(); + countSourceNodes(count, streamJoinNode, new HashSet<>()); + if (count.get() > 1) { + return false; + } + if (streamJoinNode.parentNodes().size() > 1) { + return false; + } + for (final GraphNode parent: streamJoinNode.parentNodes()) { + for (final GraphNode sibling : parent.children()) { + if (sibling instanceof ProcessorGraphNode) { + if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { + continue; + } + } + if (sibling != streamJoinNode + && sibling.buildPriority() < streamJoinNode.buildPriority()) { + return false; + } + } + } + return true; + } + + private void countSourceNodes( Review Comment: instead of incrementing `count` can we have this method return `int`? (you can then sum up recursive call results) - generally better to avoid side-effects as it makes debugging easier alternatively, we can return a `boolean` and change this to `hasExactlyOneSourceNode`, which would return false if more than one of the recursive calls returned `true` (also allowing us to short-circuit the logic, not that the performance boost matters) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ + private boolean isSelfJoin(final GraphNode streamJoinNode) { + final AtomicInteger count = new AtomicInteger(); + countSourceNodes(count, streamJoinNode, new HashSet<>()); + if (count.get() > 1) { + return false; + } + if (streamJoinNode.parentNodes().size() > 1) { + return false; + } + for (final GraphNode parent: streamJoinNode.parentNodes()) { Review Comment: instead of making this a for loop, can we use `Iterables.getOnlyElement()`? that should make this a bit easier to reason about ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. Review Comment: this questions is because I'm not familiar with the codebase, but can you explain what build priority is and why it matters in this algorithm? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ + private boolean isSelfJoin(final GraphNode streamJoinNode) { + final AtomicInteger count = new AtomicInteger(); Review Comment: nit: rename this to numSourceNodes for readability (or if you take my suggestion below, just inline the call in the `if` branch) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { + private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + + private final String windowName; + private final long joinBeforeMs; + private final long joinAfterMs; + private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis; + private final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther; + + private final TimeTracker sharedTimeTracker; + + KStreamKStreamSelfJoin( + final String windowName, + final JoinWindowsInternal windows, + final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis, + final ValueJoinerWithKey<? super K, ? super V2, ? super V1, ? extends VOut> joinerOther, + final TimeTracker sharedTimeTracker) { + + this.windowName = windowName; + this.joinBeforeMs = windows.beforeMs; + this.joinAfterMs = windows.afterMs; + this.joinerThis = joinerThis; + this.joinerOther = joinerOther; + this.sharedTimeTracker = sharedTimeTracker; + } + + @Override + public Processor<K, V1, K, VOut> get() { + return new KStreamKStreamSelfJoinProcessor(); + } + + private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> { + private WindowStore<K, V2> windowStore; + private Sensor droppedRecordsSensor; + + @Override + public void init(final ProcessorContext<K, VOut> context) { + super.init(context); + + final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); + droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); + windowStore = context.getStateStore(windowName); + } + + @SuppressWarnings("unchecked") + @Override + public void process(final Record<K, V1> record) { + // Copied from inner join: Review Comment: let's try to avoid duplicated code - without having looked at the code in inner join, is there anything that we can do to put this in a parent class? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the Review Comment: nit: incomplete doc? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { - buildAndOptimizeTopology(false); + buildAndOptimizeTopology(null); } - public void buildAndOptimizeTopology(final boolean optimizeTopology) { + public void buildAndOptimizeTopology(final Properties props) { + // Vicky: Do we need to verify props? + final List<String> optimizationConfigs; + if (props == null) { + optimizationConfigs = new ArrayList<>(); + optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); + } else { + final StreamsConfig config = new StreamsConfig(props); + optimizationConfigs = config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG); + } mergeDuplicateSourceNodes(); - if (optimizeTopology) { - LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); + if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) Review Comment: nit: probably easier to read if we just have one top level branch for `contains(OPTIMIZE)`: ``` if (contains(OPTIMIZE)) { applyOptimizations() } void applyOptimizations() { if (contains(REUSE) reuse if (contains(MERGE) merge... } ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { Review Comment: are we sure that there's at most two nodes in `parent.children()`? if so we should assert that here and not use a `for` loop but instead get indexes explicitly ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); + } + } + } + + /** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ + private boolean isSelfJoin(final GraphNode streamJoinNode) { + final AtomicInteger count = new AtomicInteger(); + countSourceNodes(count, streamJoinNode, new HashSet<>()); + if (count.get() > 1) { + return false; + } + if (streamJoinNode.parentNodes().size() > 1) { Review Comment: should this check be `!= 1`? There should never be a case here where `<1` is true, right? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { + rewriteSelfJoin(child, visited); Review Comment: for my understanding, is it possible to have multiple self joins in a single topology? I thought from offline discussions that this wasn't possible - in which case should we cascade and exit the recursion once we've identified a single self join? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } + /** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ + @SuppressWarnings("unchecked") + private void rewriteSelfJoin(final GraphNode currentNode, final Set<GraphNode> visited) { + visited.add(currentNode); + if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { + ((StreamStreamJoinNode) currentNode).setSelfJoin(); + // Remove JoinOtherWindowed node + final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); + GraphNode left = null, right = null; + for (final GraphNode child: parent.children()) { + if (child instanceof ProcessorGraphNode + && isStreamJoinWindowNode((ProcessorGraphNode) child)) { + if (left == null) { + left = child; + } else { + right = child; + } + } + } + // Sanity check + if (left != null && right != null && left.buildPriority() < right.buildPriority()) { + parent.removeChild(right); + } + } + for (final GraphNode child: currentNode.children()) { + if (!visited.contains(child)) { Review Comment: same comment about `GraphNode#equals` as below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org