wcarlson5 commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1238801056
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -56,10 +77,60 @@ public void init(final ProcessorContext<K1, VOut> context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + if (useBuffer) { + if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { + throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); + } + + buffer.get().setSerdesIfNull(new SerdeGetter(context)); + //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context) Review Comment: Yeah we can remove ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table, final NamedInternal renamed = new NamedInternal(joinedInternal.name()); final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME); + + Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty(); + + if (joined.gracePeriod() != null) { Review Comment: This seems to be the main sticking point of the PR. The thought was that this that we would need some way to stop using the grace period without loosing records, but you bring up a good point. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ########## @@ -122,12 +122,12 @@ public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(final K key) { } @Override - public void put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) { + public boolean put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) { Review Comment: added `shouldReturnIfRecordWasAdded` ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java: ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class KStreamKTableJoinWithGraceTest { + private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0]; + + private final String streamTopic = "streamTopic"; + private final String tableTopic = "tableTopic"; + private TestInputTopic<Integer, String> inputStreamTopic; + private TestInputTopic<Integer, String> inputTableTopic; + private final int[] expectedKeys = {0, 1, 2, 3}; + + private MockApiProcessor<Integer, String, Void, Void> processor; + private TopologyTestDriver driver; Review Comment: Okay so I actually tried to set this up and it was NOT simpler. The mocking got way out of hand. Normally I 100 precent agree with out, mocking is way better. Due to you other comment I just ended up moving this whole class to the unit testing class for the normal Join anyways, so this is a bit of a moot point ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class KStreamKTableJoinWithGraceTest { + private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0]; + + private final String streamTopic = "streamTopic"; + private final String tableTopic = "tableTopic"; + private TestInputTopic<Integer, String> inputStreamTopic; + private TestInputTopic<Integer, String> inputTableTopic; + private final int[] expectedKeys = {0, 1, 2, 3}; + + private MockApiProcessor<Integer, String, Void, Void> processor; + private TopologyTestDriver driver; + private StreamsBuilder builder; + private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + + @BeforeEach + public void setUp() { + builder = new StreamsBuilder(); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + + } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table(tableTopic, consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @AfterEach + public void cleanup() { + driver.close(); + } + + private void pushToStream(final int messageCount, final String valuePrefix) { + for (int i = 0; i < messageCount; i++) { + inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i); + } + } + + private void pushToTable(final int messageCount, final String valuePrefix) { + for (int i = 0; i < messageCount; i++) { + inputTableTopic.pipeInput( + expectedKeys[i], + valuePrefix + expectedKeys[i], + 0); + } + } + + private void pushNullValueToTable() { + for (int i = 0; i < 2; i++) { + inputTableTopic.pipeInput(expectedKeys[i], null); + } + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + assertThrows(IllegalArgumentException.class, Review Comment: sure, good call. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class KStreamKTableJoinWithGraceTest { + private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0]; + + private final String streamTopic = "streamTopic"; + private final String tableTopic = "tableTopic"; + private TestInputTopic<Integer, String> inputStreamTopic; + private TestInputTopic<Integer, String> inputTableTopic; + private final int[] expectedKeys = {0, 1, 2, 3}; + + private MockApiProcessor<Integer, String, Void, Void> processor; + private TopologyTestDriver driver; + private StreamsBuilder builder; + private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + + @BeforeEach + public void setUp() { + builder = new StreamsBuilder(); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + + } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table(tableTopic, consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @AfterEach + public void cleanup() { + driver.close(); + } + + private void pushToStream(final int messageCount, final String valuePrefix) { + for (int i = 0; i < messageCount; i++) { + inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i); + } + } + + private void pushToTable(final int messageCount, final String valuePrefix) { + for (int i = 0; i < messageCount; i++) { + inputTableTopic.pipeInput( + expectedKeys[i], + valuePrefix + expectedKeys[i], + 0); + } + } + + private void pushNullValueToTable() { + for (int i = 0; i < 2; i++) { + inputTableTopic.pipeInput(expectedKeys[i], null); + } + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + assertThrows(IllegalArgumentException.class, + () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); + } + + @Test + public void shouldDelayJoinByGracePeriod() { + makeJoin(Duration.ofMillis(2)); + + // push four items to the table. this should not produce any item. + pushToTable(4, "Y"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1)); + + // push all items to the table. this should not produce any item + pushToTable(4, "YY"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+YY0", 0), + new KeyValueTimestamp<>(1, "X1+YY1", 1)); + + // push all items to the table. this should not produce any item + pushToTable(4, "YYY"); + processor.checkAndClearProcessResult(EMPTY); + } + + @Test + public void shouldCreateRepartitionTopicsWithUserProvidedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableC", Duration.ofMinutes(5)))); + final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k)); + + rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ZERO)).to("out-one"); + rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join", Duration.ZERO)).to("out-two"); + final Topology topology = builder.build(props); + System.out.println(topology.describe().toString()); + assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString()); + } + + @Test + public void shouldRequireCopartitionedStreams() { + makeJoin(Duration.ofMillis(9)); + + final Collection<Set<String>> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next()); + } + + @Test + public void shouldNotJoinWithEmptyTableOnStreamUpdates() { + makeJoin(Duration.ofMillis(1)); + // push four items to the primary stream. the table is empty + pushToStream(4, "X"); + processor.checkAndClearProcessResult(EMPTY); + } + + @Test + public void shouldNotJoinOnTableUpdates() { Review Comment: That is a good idea, massively simplified the code. I resolved the other comments that were pretty much the same as this one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org