[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second 
uses the `joinOtherBeforeMs`. This is needed as the inner join uses different 
intervals when fetching rows from the window store based on whether it is the 
left or right-hand side. Since we want the self-join to match the output of the 
inner-join, I followed the same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970805661


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Join this with other
+context().forward(
+

[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second 
uses the `joinOtherBeforeMs`. This is needed as the inner join (for a reason I 
have not understood cc @vvcephei ) uses different intervals when fetching rows 
from the window store based on whether it is the left or right-hand side. Since 
we want the self-join to match the output of the inner-join, I followed the 
same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970792491


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Join this with other
+context().forward(
+

[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1644,6 +1668,29 @@ private Map getClientCustomProps() {
 return props;
 }
 
+public static List verifyTopologyOptimizationConfigs(final String 
config) {
+final List acceptableConfigs = Arrays.asList(

Review Comment:
   Yes, I have been conflicted about this. I wanted to add it as a static 
variable as well but I am worried if someone else adds a new rewriting, they 
might miss it. Whereas in the method, they will for sure see it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969520183


##
streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+public static final String INPUT_TOPIC = "selfjoin-input";
+public static final String OUTPUT_TOPIC = "selfjoin-output";
+private String inputTopic;
+private String outputTopic;
+
+private KafkaStreams kafkaStreams;
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Rule
+public TestName testName = new TestName();
+
+@Before
+public void createTopics() throws Exception {
+inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+CLUSTER.createTopic(inputTopic);
+CLUSTER.createTopic(outputTopic);
+}
+
+private Properties props() {
+final Properties streamsConfiguration = new Properties();
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+ Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+ Serdes.String().getClass());
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+return streamsConfiguration;
+}
+
+@After
+public void shutdown() {
+if (kafkaStreams != null) {
+kafkaStreams.close(Duration.ofSeconds(30L));
+

[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969484570


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
 
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
 
+@Test
+public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {

Review Comment:
   I am not sure I follow. What do you mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969391230


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
 public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
 private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
+public static final String SELF_JOIN_OPTIMIZATION_CONFIG = 
"self.join.optimization";

Review Comment:
   I  agree with the idea to keep them organized and I added the namespace but 
the configs seem quite long now, not sure if it's better. 
`topology.optimization.reuse.ktable.source.topics` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##
@@ -181,13 +181,22 @@ public  KStream join(final 
KStream lhs,
 sharedTimeTracker
 );
 
+final KStreamKStreamSelfJoin selfJoin = new 
KStreamKStreamSelfJoin<>(
+thisWindowStore.name(),
+internalWindows,
+joiner,
+AbstractStream.reverseJoinerWithKey(joiner),
+sharedTimeTracker
+);

Review Comment:
   Yes, unfortunately, this is done before the logical plan is created and 
optimized and this is the only place where I can get the information needed to 
create this node. I tried to do this when building the topology after the 
rewriting but it was impossible without major refactoring as currently,  there 
is not a clear separation between logical and physical plan. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##
@@ -181,13 +181,22 @@ public  KStream join(final 
KStream lhs,
 sharedTimeTracker
 );
 
+final KStreamKStreamSelfJoin selfJoin = new 
KStreamKStreamSelfJoin<>(
+thisWindowStore.name(),
+internalWindows,
+joiner,
+AbstractStream.reverseJoinerWithKey(joiner),
+sharedTimeTracker
+);

Review Comment:
   Yes, unfortunately this is the only place where I can get the information 
needed to create this node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1644,6 +1668,29 @@ private Map getClientCustomProps() {
 return props;
 }
 
+public static List verifyTopologyOptimizationConfigs(final String 
config) {
+final List acceptableConfigs = Arrays.asList(

Review Comment:
   Yes, I have been conflicted about this. I wanted to add it as a static 
variable as well but I am worried if someone else adds a new rewriting, they 
might miss it. Whereas in the method, they will for sure see it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969353780


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+private final ValueJoinerWithKey joinerOther;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final ValueJoinerWithKey joinerOther,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.joinerOther = joinerOther;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
StreamStreamJoinProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+System.out.println("---> Processing record: " + record);
+if (skipRecord(record, LOG, droppedRecordsSensor)) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+System.out.println("> Window store fetch, timeFrom=" + 
timeFrom + " timeTo=" + timeTo);
+
+// Join current record with other
+System.out.println("> Window store fetch, timeFrom=" + 
timeFrom + " timeTo=" + timeTo);

Review Comment:
   These will all be removed once I am done with the reviews



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969353364


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean 
optimizeTopology) {
 internalTopologyBuilder.validateCopartition();
 }
 
+/**
+ * A user can provide either the config OPTIMIZE which means all 
optimizations rules will be
+ * applied or they can provide a list of optimization rules.
+ */
+private void optimizeTopology(final Properties props) {
+final List optimizationConfigs;
+if (props == null || 
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+} else {
+optimizationConfigs = 
StreamsConfig.verifyTopologyOptimizationConfigs(
+(String) 
props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| 
optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+LOG.debug("Optimizing the Kafka Streams graph for ktable source 
nodes");
+optimizeKTableSourceTopics();
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| 
optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+maybeOptimizeRepartitionOperations();
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) {
+LOG.debug("Optimizing the Kafka Streams graph for self-joins");
+rewriteSelfJoin(root, new IdentityHashMap<>());

Review Comment:
   You mean the order in which we check and apply the rewritings? I followed 
the same order as before, as in I added the self-join rewriting last. 
Technically, it shouldn't matter for the self-join at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969351766


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(null);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {

Review Comment:
   No, it's in the `internals` package so it's not public API. Right @vvcephei ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-13 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969349691


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {

Review Comment:
   @vvcephei , yes that's what the code is checking. I updated the comment and 
added an extra test where I have two joins under the same source. I updated 
also the code to make the checks more fail-proof. 
   
   @jnh5y , there can be at most two `isStreamJoinWindowNode` nodes _per_ join. 
 There will be one `isStreamJoinWindowNode` if we have N-way joins. In this 
case, only the first one will be optimized as a self-join.
   
   All the tests are in `InternalStreamsBuilderTest`. I tried to create several 
topologies of joins but I am sure there are more I haven't thought of. If you 
feel there are any missing, please let me know and I will add them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-05 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r962965501


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {
+visited.add(parent);
+countSourceNodes(count, parent, visited);
+}
+}
+}
+
+private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+return node.processorParameters() != null

Review Comment:
   We can apply the self-join optimization in the above example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-05 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r962781264


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {

Review Comment:
   Actually, tests fail with the equals so I will use the IdentifyHashMap since 
it seems the assumption is that `GraphNodes` are compared by reference. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961032133


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Discussed offline with @guozhangwang .
   
   If the `map` or other transformation is a sibling of the join node but the 
join node has a single parent, then this means this transformation is a no-op 
and hence the optimization can be applied. For the transformation to not be a 
no-op, the join node must have two or more parents, hence the optimization 
won't be applicable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r961012010


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   The logical plan will have only one source since they will get merged as 
they are referring to the same topic. Hence, all the nodes will be children of 
the source node and siblings of the join node. 
   For example,
   Case 2:
   ```
   root -> source
   source -> join-window1, join-window2, stream-join
   ```
   
   and
   Case 3:
   ```
   root -> source
   source -> join-window1, map-values1, join-window2, stream-join
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960956896


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {

Review Comment:
   Great catch, thank you! I am not sure about the latter so I implemented the 
equals and hashcode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);

Review Comment:
   We can have n-way self-joins but they won't get optimized, only the first 
one will get optimized. However, we can have topologies that have more than one 
self-joins, not n-way but independent pairs. And all of them can and should be 
optimized. I added a test case in 
`InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);

Review Comment:
   We can have multiple joins but they won't get optimized, only the first one 
will get optimized. However, we can have topologies that have more than one 
self-joins, not n-way but independent pairs. And all of them can and should be 
optimized. I added a test case in 
`InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.

Review Comment:
   This is the order in which the nodes in the graph are visited and added to 
the topology. It matters here because the join node can have siblings with 
smaller build priority, hence they will be before it in the topological order 
of the topology (applied before the join). The only acceptable nodes are the 
JoinWindow nodes. If there are others, then it is not a self-join. 
   
   This check covers cases like:
   ```
   stream1 = builder.stream("topic1");
   streams1.mapValues(v -> v);
   stream2 = builder.stream("topic1"); // same topic
   stream1.join(stream2)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960654203


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {

Review Comment:
   No, there can be more than two nodes. That's why I am doing a for loop and I 
am checking to find the nodes that I need to remove with 
`isStreamJoinWindowNode` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960647123


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {

Review Comment:
   No Guava in Streams :P



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960474042


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(null);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(final Properties props) {
+// Vicky: Do we need to verify props?
+final List optimizationConfigs;
+if (props == null) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+} else {
+final StreamsConfig config = new StreamsConfig(props);
+optimizationConfigs = 
config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG);
+}
 
 mergeDuplicateSourceNodes();
-if (optimizeTopology) {
-LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)

Review Comment:
   A user can optimize either by providing the config `all` which is the 
`StreamsConfig.OPTIMIZE` which will apply all optimization rules or by 
specifying a list of optimization rules. With your suggestion, an optimization 
will be applied only if `contains(OPTIMIZE)` is true which is not correct. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have 
tests for these cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that 
processors like `mapValues` or `filter` or `transform` are black-boxes. There 
is no way to know how they change the contents of a stream hence there is no 
way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had 
in-scope cases like 1 and 2. I can add it by adding a special rule to the 
rewriter that would check if the parent of the join is a merge, then it's ok to 
have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-01 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! 
   Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have 
tests for this cases in `InternalStreamsBuilderTest`.  
   Case 3 is not optimizable and won't be recognized. The reason is that 
processors like `mapValues` or `filter` or `transform` are black-boxes. There 
is no way to know how they change the contents of a stream hence there is no 
way to figure out if the two sides of the join are still the same. 
   Case 4 could be optimizable but I did not consider it. I initially only had 
in-scope cases like 1 and 2. I can add it by adding a special rule to the 
rewriter that would check if the parent of the join is a merge, then it's ok to 
have multiple source nodes as long as they are ancestors of the merge node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-08-31 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959392484


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {
+if (child instanceof  ProcessorGraphNode
+&& isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+if (left == null) {
+left = child;
+} else {
+right = child;
+}
+}
+}
+// Sanity check
+if (left != null && right != null && left.buildPriority() < 
right.buildPriority()) {
+parent.removeChild(right);
+}
+}
+for (final GraphNode child: currentNode.children()) {
+if (!visited.contains(child)) {
+rewriteSelfJoin(child, visited);
+}
+}
+}
+
+/**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single 
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow  that are 
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
+ */
+private boolean isSelfJoin(final GraphNode streamJoinNode) {
+final AtomicInteger count = new AtomicInteger();
+countSourceNodes(count, streamJoinNode, new HashSet<>());
+if (count.get() > 1) {
+return false;
+}
+if (streamJoinNode.parentNodes().size() > 1) {
+return false;
+}
+for (final GraphNode parent: streamJoinNode.parentNodes()) {
+for (final GraphNode sibling : parent.children()) {
+if (sibling instanceof ProcessorGraphNode) {
+if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) {
+continue;
+}
+}
+if (sibling != streamJoinNode
+&& sibling.buildPriority() < 
streamJoinNode.buildPriority()) {
+return false;
+}
+}
+}
+return true;
+}
+
+private void countSourceNodes(
+final AtomicInteger count,
+final GraphNode currentNode,
+final Set visited) {
+
+if (currentNode instanceof StreamSourceNode) {
+count.incrementAndGet();
+}
+
+for (final GraphNode parent: currentNode.parentNodes()) {
+if (!visited.contains(parent)) {
+visited.add(parent);
+countSourceNodes(count, parent, visited);
+}
+}
+}
+
+private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) {
+return node.processorParameters() != null

Review Comment:
   In this code example 
   ```
   final KStream stream1 = 
builder.stream(Collections.singleton("t1"), consumed);
   stream1.mapValues(v -> v);
   final KStream stream2 = 
builder.stream(Collections.singleton("t1"), consumed);
   stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 

JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
   ```
   the `node.processorParameters().processorSupplier()` is null. I added the 
first check just to cover my bases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-08-31 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959328394


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   The main goal of the conditions is to ensure that the join arguments are the 
same and are not altered somewhere along the path from the root to the join in 
a unilateral (only one side) way.
   
   The first condition is needed to make sure that the join has a single source 
ancestor. We don't want to apply the optimization if there is another source 
somewhere along the path from the root to the join node. This path might be 
long and might go through multiple joins etc. so just looking for a single 
parent is not enough. 
   
   The second condition is needed to ensure that no transformation is applied 
to one side of the join and not the other.
   
   The third condition is needed for the same reason because in my tests, I 
have seen topologies where the `ProcessorNodes` are added as siblings to the 
join and not as parents of it. For instance, in this example:
   ```
   final KStream stream1 = 
builder.stream(Collections.singleton("t1"), consumed);
   stream1.mapValues(v -> v);
   final KStream stream2 = 
builder.stream(Collections.singleton("t1"), consumed);
   stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 

JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
   ```
   the node for the `map` is a sibling of the join node and not a parent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-08-30 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958321621


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
 public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
 private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
+public static final String SELF_JOIN_OPTIMIZATION_CONFIG = 
"self.join.optimization";

Review Comment:
   I agree with your suggestion in the KIP and will make the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-08-30 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958320960


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(false, false);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+public void buildAndOptimizeTopology(
+final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
 mergeDuplicateSourceNodes();
 if (optimizeTopology) {
 LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
 optimizeKTableSourceTopics();
 maybeOptimizeRepartitionOperations();
+if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! What do you mean by the  "two-pass" mechanism? Do you 
mean that one pass is building the plan and the second is optimizing it? What 
is wrong with doing this? 
   
   The complications that I came across are actually not from this but from the 
fact that the logical plan is not so logical but has a lot of physical plan 
information already baked into it. I don't think it is a good idea to optimize 
the plan while building it (in one pass) as some optimizations cannot be 
identified until after the full plan is built. Moreover, optimizing the plan 
while building allows an optimization rule to only consider "local" information 
(up to the point of what the plan currently contains) and not the whole plan 
holistically which contributes to excluding optimizations that could otherwise 
be applied simply because the optimizer doesn't have all the information yet. 
Moreover, it contributes to "race" conditions between optimizations as applying 
one optimization might exclude another when the second could have been more 
beneficial. 
   
   Regarding the optimization in this PR, it is not possible to identify 
whether it is a self-join until we have seen the entire plan. We need to make 
sure that it is the same source in both arguments of the join and that no 
transformation is applied to one argument and not the other. I am afraid, the 
checks that I am doing on whether a self-join is applicable are unavoidable 
whether we do the optimizations on the fly or afterward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org