[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); Review Comment: They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970805661 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; +final Record selfRecord = record +.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value())) +.withTimestamp(inputRecordTimestamp); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +// Join current record with other +try (final WindowStoreIterator iter = windowStore.fetch( +record.key(), timeFrom, timeTo)) { +while (iter.hasNext()) { +final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Join this with other +context().forward( +
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); Review Comment: They are not the same: the first uses the `joinThisBeforeMs` and the second uses the `joinOtherBeforeMs`. This is needed as the inner join (for a reason I have not understood cc @vvcephei ) uses different intervals when fetching rows from the window store based on whether it is the left or right-hand side. Since we want the self-join to match the output of the inner-join, I followed the same logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r970792491 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; +final Record selfRecord = record +.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value())) +.withTimestamp(inputRecordTimestamp); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +// Join current record with other +try (final WindowStoreIterator iter = windowStore.fetch( +record.key(), timeFrom, timeTo)) { +while (iter.hasNext()) { +final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Join this with other +context().forward( +
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1644,6 +1668,29 @@ private Map getClientCustomProps() { return props; } +public static List verifyTopologyOptimizationConfigs(final String config) { +final List acceptableConfigs = Arrays.asList( Review Comment: Yes, I have been conflicted about this. I wanted to add it as a static variable as well but I am worried if someone else adds a new rewriting, they might miss it. Whereas in the method, they will for sure see it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969520183 ## streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import static java.time.Duration.ofMinutes; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class SelfJoinUpgradeIntegrationTest { +public static final String INPUT_TOPIC = "selfjoin-input"; +public static final String OUTPUT_TOPIC = "selfjoin-output"; +private String inputTopic; +private String outputTopic; + +private KafkaStreams kafkaStreams; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Rule +public TestName testName = new TestName(); + +@Before +public void createTopics() throws Exception { +inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName); +outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName); +CLUSTER.createTopic(inputTopic); +CLUSTER.createTopic(outputTopic); +} + +private Properties props() { +final Properties streamsConfiguration = new Properties(); +final String safeTestName = safeUniqueTestName(getClass(), testName); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); +streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, + Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, + Serdes.String().getClass()); +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); +streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +return streamsConfiguration; +} + +@After +public void shutdown() { +if (kafkaStreams != null) { +kafkaStreams.close(Duration.ofSeconds(30L)); +
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969484570 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() { assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } +@Test +public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() { Review Comment: I am not sure I follow. What do you mean? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969391230 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig { public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; +public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization"; Review Comment: I agree with the idea to keep them organized and I added the namespace but the configs seem quite long now, not sure if it's better. `topology.optimization.reuse.ktable.source.topics` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ## @@ -181,13 +181,22 @@ public KStream join(final KStream lhs, sharedTimeTracker ); +final KStreamKStreamSelfJoin selfJoin = new KStreamKStreamSelfJoin<>( +thisWindowStore.name(), +internalWindows, +joiner, +AbstractStream.reverseJoinerWithKey(joiner), +sharedTimeTracker +); Review Comment: Yes, unfortunately, this is done before the logical plan is created and optimized and this is the only place where I can get the information needed to create this node. I tried to do this when building the topology after the rewriting but it was impossible without major refactoring as currently, there is not a clear separation between logical and physical plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969371482 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ## @@ -181,13 +181,22 @@ public KStream join(final KStream lhs, sharedTimeTracker ); +final KStreamKStreamSelfJoin selfJoin = new KStreamKStreamSelfJoin<>( +thisWindowStore.name(), +internalWindows, +joiner, +AbstractStream.reverseJoinerWithKey(joiner), +sharedTimeTracker +); Review Comment: Yes, unfortunately this is the only place where I can get the information needed to create this node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969358545 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1644,6 +1668,29 @@ private Map getClientCustomProps() { return props; } +public static List verifyTopologyOptimizationConfigs(final String config) { +final List acceptableConfigs = Arrays.asList( Review Comment: Yes, I have been conflicted about this. I wanted to add it as a static variable as well but I am worried if someone else adds a new rewriting, they might miss it. Whereas in the method, they will for sure see it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969353780 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; +private final ValueJoinerWithKey joinerOther; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final ValueJoinerWithKey joinerOther, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.joinerOther = joinerOther; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends StreamStreamJoinProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +System.out.println("---> Processing record: " + record); +if (skipRecord(record, LOG, droppedRecordsSensor)) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; + +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +System.out.println("> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo); + +// Join current record with other +System.out.println("> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo); Review Comment: These will all be removed once I am done with the reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969353364 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) { internalTopologyBuilder.validateCopartition(); } +/** + * A user can provide either the config OPTIMIZE which means all optimizations rules will be + * applied or they can provide a list of optimization rules. + */ +private void optimizeTopology(final Properties props) { +final List optimizationConfigs; +if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); +} else { +optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs( +(String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) { +LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes"); +optimizeKTableSourceTopics(); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) { +LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); +maybeOptimizeRepartitionOperations(); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) { +LOG.debug("Optimizing the Kafka Streams graph for self-joins"); +rewriteSelfJoin(root, new IdentityHashMap<>()); Review Comment: You mean the order in which we check and apply the rewritings? I followed the same order as before, as in I added the self-join rewriting last. Technically, it shouldn't matter for the self-join at least. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969351766 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(null); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { Review Comment: No, it's in the `internals` package so it's not public API. Right @vvcephei ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969349691 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { Review Comment: @vvcephei , yes that's what the code is checking. I updated the comment and added an extra test where I have two joins under the same source. I updated also the code to make the checks more fail-proof. @jnh5y , there can be at most two `isStreamJoinWindowNode` nodes _per_ join. There will be one `isStreamJoinWindowNode` if we have N-way joins. In this case, only the first one will be optimized as a self-join. All the tests are in `InternalStreamsBuilderTest`. I tried to create several topologies of joins but I am sure there are more I haven't thought of. If you feel there are any missing, please let me know and I will add them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r962965501 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { +for (final GraphNode sibling : parent.children()) { +if (sibling instanceof ProcessorGraphNode) { +if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { +continue; +} +} +if (sibling != streamJoinNode +&& sibling.buildPriority() < streamJoinNode.buildPriority()) { +return false; +} +} +} +return true; +} + +private void countSourceNodes( +final AtomicInteger count, +final GraphNode currentNode, +final Set visited) { + +if (currentNode instanceof StreamSourceNode) { +count.incrementAndGet(); +} + +for (final GraphNode parent: currentNode.parentNodes()) { +if (!visited.contains(parent)) { +visited.add(parent); +countSourceNodes(count, parent, visited); +} +} +} + +private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) { +return node.processorParameters() != null Review Comment: We can apply the self-join optimization in the above example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r962781264 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { +for (final GraphNode sibling : parent.children()) { +if (sibling instanceof ProcessorGraphNode) { +if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { +continue; +} +} +if (sibling != streamJoinNode +&& sibling.buildPriority() < streamJoinNode.buildPriority()) { +return false; +} +} +} +return true; +} + +private void countSourceNodes( +final AtomicInteger count, +final GraphNode currentNode, +final Set visited) { + +if (currentNode instanceof StreamSourceNode) { +count.incrementAndGet(); +} + +for (final GraphNode parent: currentNode.parentNodes()) { +if (!visited.contains(parent)) { Review Comment: Actually, tests fail with the equals so I will use the IdentifyHashMap since it seems the assumption is that `GraphNodes` are compared by reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r961032133 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Discussed offline with @guozhangwang . If the `map` or other transformation is a sibling of the join node but the join node has a single parent, then this means this transformation is a no-op and hence the optimization can be applied. For the transformation to not be a no-op, the join node must have two or more parents, hence the optimization won't be applicable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r961012010 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: The logical plan will have only one source since they will get merged as they are referring to the same topic. Hence, all the nodes will be children of the source node and siblings of the join node. For example, Case 2: ``` root -> source source -> join-window1, join-window2, stream-join ``` and Case 3: ``` root -> source source -> join-window1, map-values1, join-window2, stream-join ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960956896 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { +for (final GraphNode sibling : parent.children()) { +if (sibling instanceof ProcessorGraphNode) { +if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { +continue; +} +} +if (sibling != streamJoinNode +&& sibling.buildPriority() < streamJoinNode.buildPriority()) { +return false; +} +} +} +return true; +} + +private void countSourceNodes( +final AtomicInteger count, +final GraphNode currentNode, +final Set visited) { + +if (currentNode instanceof StreamSourceNode) { +count.incrementAndGet(); +} + +for (final GraphNode parent: currentNode.parentNodes()) { +if (!visited.contains(parent)) { Review Comment: Great catch, thank you! I am not sure about the latter so I implemented the equals and hashcode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); Review Comment: We can have n-way self-joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960845326 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); Review Comment: We can have multiple joins but they won't get optimized, only the first one will get optimized. However, we can have topologies that have more than one self-joins, not n-way but independent pairs. And all of them can and should be optimized. I added a test case in `InternalStreamsBuilderTest:shouldMarkAllStreamStreamJoinsAsSelfJoin` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. Review Comment: This is the order in which the nodes in the graph are visited and added to the topology. It matters here because the join node can have siblings with smaller build priority, hence they will be before it in the topological order of the topology (applied before the join). The only acceptable nodes are the JoinWindow nodes. If there are others, then it is not a self-join. This check covers cases like: ``` stream1 = builder.stream("topic1"); streams1.mapValues(v -> v); stream2 = builder.stream("topic1"); // same topic stream1.join(stream2) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960654203 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { Review Comment: No, there can be more than two nodes. That's why I am doing a for loop and I am checking to find the nodes that I need to remove with `isStreamJoinWindowNode` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960647123 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { Review Comment: No Guava in Streams :P -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960474042 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,17 +276,36 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(null); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology(final Properties props) { +// Vicky: Do we need to verify props? +final List optimizationConfigs; +if (props == null) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); +} else { +final StreamsConfig config = new StreamsConfig(props); +optimizationConfigs = config.getList(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG); +} mergeDuplicateSourceNodes(); -if (optimizeTopology) { -LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) Review Comment: A user can optimize either by providing the config `all` which is the `StreamsConfig.OPTIMIZE` which will apply all optimization rules or by specifying a list of optimization rules. With your suggestion, an optimization will be applied only if `contains(OPTIMIZE)` is true which is not correct. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hey @guozhangwang ! Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for these cases in `InternalStreamsBuilderTest`. Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r960457146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hey @guozhangwang ! Cases 1 and 2 are optimizable and will be optimized by the algorithm. I have tests for this cases in `InternalStreamsBuilderTest`. Case 3 is not optimizable and won't be recognized. The reason is that processors like `mapValues` or `filter` or `transform` are black-boxes. There is no way to know how they change the contents of a stream hence there is no way to figure out if the two sides of the join are still the same. Case 4 could be optimizable but I did not consider it. I initially only had in-scope cases like 1 and 2. I can add it by adding a special rule to the rewriter that would check if the parent of the join is a merge, then it's ok to have multiple source nodes as long as they are ancestors of the merge node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r959392484 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +362,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { +if (child instanceof ProcessorGraphNode +&& isStreamJoinWindowNode((ProcessorGraphNode) child)) { +if (left == null) { +left = child; +} else { +right = child; +} +} +} +// Sanity check +if (left != null && right != null && left.buildPriority() < right.buildPriority()) { +parent.removeChild(right); +} +} +for (final GraphNode child: currentNode.children()) { +if (!visited.contains(child)) { +rewriteSelfJoin(child, visited); +} +} +} + +/** + * The self-join rewriting can be applied if: + * 1. The path from the StreamStreamJoinNode to the root contains a single source node. + * 2. The StreamStreamJoinNode has a single parent. + * 3. There are no other nodes besides the KStreamJoinWindow that are siblings of the + * StreamStreamJoinNode and have smaller build priority. + */ +private boolean isSelfJoin(final GraphNode streamJoinNode) { +final AtomicInteger count = new AtomicInteger(); +countSourceNodes(count, streamJoinNode, new HashSet<>()); +if (count.get() > 1) { +return false; +} +if (streamJoinNode.parentNodes().size() > 1) { +return false; +} +for (final GraphNode parent: streamJoinNode.parentNodes()) { +for (final GraphNode sibling : parent.children()) { +if (sibling instanceof ProcessorGraphNode) { +if (isStreamJoinWindowNode((ProcessorGraphNode) sibling)) { +continue; +} +} +if (sibling != streamJoinNode +&& sibling.buildPriority() < streamJoinNode.buildPriority()) { +return false; +} +} +} +return true; +} + +private void countSourceNodes( +final AtomicInteger count, +final GraphNode currentNode, +final Set visited) { + +if (currentNode instanceof StreamSourceNode) { +count.incrementAndGet(); +} + +for (final GraphNode parent: currentNode.parentNodes()) { +if (!visited.contains(parent)) { +visited.add(parent); +countSourceNodes(count, parent, visited); +} +} +} + +private boolean isStreamJoinWindowNode(final ProcessorGraphNode node) { +return node.processorParameters() != null Review Comment: In this code example ``` final KStream stream1 = builder.stream(Collections.singleton("t1"), consumed); stream1.mapValues(v -> v); final KStream stream2 = builder.stream(Collections.singleton("t1"), consumed); stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))); ``` the `node.processorParameters().processorSupplier()` is null. I added the first check just to cover my bases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r959328394 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: The main goal of the conditions is to ensure that the join arguments are the same and are not altered somewhere along the path from the root to the join in a unilateral (only one side) way. The first condition is needed to make sure that the join has a single source ancestor. We don't want to apply the optimization if there is another source somewhere along the path from the root to the join node. This path might be long and might go through multiple joins etc. so just looking for a single parent is not enough. The second condition is needed to ensure that no transformation is applied to one side of the join and not the other. The third condition is needed for the same reason because in my tests, I have seen topologies where the `ProcessorNodes` are added as siblings to the join and not as parents of it. For instance, in this example: ``` final KStream stream1 = builder.stream(Collections.singleton("t1"), consumed); stream1.mapValues(v -> v); final KStream stream2 = builder.stream(Collections.singleton("t1"), consumed); stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))); ``` the node for the `map` is a sibling of the join node and not a parent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r958321621 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig { public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; +public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization"; Review Comment: I agree with your suggestion in the KIP and will make the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join
vpapavas commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r958320960 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(false, false); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { +public void buildAndOptimizeTopology( +final boolean optimizeTopology, final boolean optimizeSelfJoin) { mergeDuplicateSourceNodes(); if (optimizeTopology) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); +if (optimizeSelfJoin) { Review Comment: Hey @guozhangwang ! What do you mean by the "two-pass" mechanism? Do you mean that one pass is building the plan and the second is optimizing it? What is wrong with doing this? The complications that I came across are actually not from this but from the fact that the logical plan is not so logical but has a lot of physical plan information already baked into it. I don't think it is a good idea to optimize the plan while building it (in one pass) as some optimizations cannot be identified until after the full plan is built. Moreover, optimizing the plan while building allows an optimization rule to only consider "local" information (up to the point of what the plan currently contains) and not the whole plan holistically which contributes to excluding optimizations that could otherwise be applied simply because the optimizer doesn't have all the information yet. Moreover, it contributes to "race" conditions between optimizations as applying one optimization might exclude another when the second could have been more beneficial. Regarding the optimization in this PR, it is not possible to identify whether it is a self-join until we have seen the entire plan. We need to make sure that it is the same source in both arguments of the join and that no transformation is applied to one argument and not the other. I am afraid, the checks that I am doing on whether a self-join is applicable are unavoidable whether we do the optimizations on the fly or afterward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org