wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
valueGetter.init(context);
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context);
+ if (buffer.isPresent()) {
+ if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
Review Comment:
If you find it easier to understand sure. that is fine
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final
KTable<K, VO> table,
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin
? LEFTJOIN_NAME : JOIN_NAME);
+
+ Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+ if (joined.gracePeriod() != null) {
Review Comment:
I see what you mean but we actually were going to create the store and leave
it empty for zero duration. The point was to make it easier to change the grace
period if desired so the store isn't getting created and destroyed. Something
about making it more backward compatible.
##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
Review Comment:
yep sure.
(Edit: The class it extents also uses junit 4 and I cant change this one
without also chaining that and all other join integration tests)
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
valueGetter.init(context);
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context);
+ if (buffer.isPresent()) {
+ if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+ throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
+ }
+
+ buffer.get().setSerdesIfNull(new SerdeGetter(context));
+ //cast doesn't matter, it is just because the processor is
deprecated. The context gets converted back with
StoreToProcessorContextAdapter.adapt(context)
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext)
context(), null);
+ }
}
@Override
public void process(final Record<K1, V1> record) {
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context());
+ updateObservedStreamTime(record.timestamp());
+ if (maybeDropRecord(record)) {
+ return;
+ }
+
+ if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+ doJoin(record);
+ } else {
+ final long deadline = observedStreamTime -
gracePeriod.get().toMillis();
+ if (record.timestamp() <= deadline) {
Review Comment:
That is a good idea. I'm also in favor of having put decide.
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+ private final static KeyValueTimestamp<?, ?>[] EMPTY = new
KeyValueTimestamp[0];
+
+ private final String streamTopic = "streamTopic";
+ private final String tableTopic = "tableTopic";
+ private TestInputTopic<Integer, String> inputStreamTopic;
+ private TestInputTopic<Integer, String> inputTableTopic;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+
+ private MockApiProcessor<Integer, String, Void, Void> processor;
+ private TopologyTestDriver driver;
+ private StreamsBuilder builder;
+ private final MockApiProcessorSupplier<Integer, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+
+ @Before
+ public void setUp() {
+ builder = new StreamsBuilder();
+ processor = supplier.theCapturedProcessor();
+ }
+
+ private void makeJoin(final Duration grace) {
+ final KStream<Integer, String> stream;
+ final KTable<Integer, String> table;
+
+ final Consumed<Integer, String> consumed =
Consumed.with(Serdes.Integer(), Serdes.String());
+ stream = builder.stream(streamTopic, consumed);
+ table = builder.table(tableTopic, consumed, Materialized.as(
+ Stores.persistentVersionedKeyValueStore("V-grace",
Duration.ofMinutes(5))));
+ stream.join(table,
+ MockValueJoiner.TOSTRING_JOINER,
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(),
"Grace", grace)
+ ).process(supplier);
+ final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+ driver = new TopologyTestDriver(builder.build(), props);
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
+ inputTableTopic = driver.createInputTopic(tableTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
+ }
+
+ @After
+ public void cleanup() {
+ driver.close();
+ }
+
+ private void pushToStream(final int messageCount, final String
valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix +
expectedKeys[i], i);
+ }
+ }
+
+ private void pushToTable(final int messageCount, final String valuePrefix)
{
+ for (int i = 0; i < messageCount; i++) {
+ inputTableTopic.pipeInput(
+ expectedKeys[i],
+ valuePrefix + expectedKeys[i],
+ 0);
+ }
+ }
+
+ private void pushNullValueToTable() {
+ for (int i = 0; i < 2; i++) {
+ inputTableTopic.pipeInput(expectedKeys[i], null);
+ }
+ }
+
+ @Test
+ public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
Review Comment:
Ah this should be removed. It will be added back in the next PR (or the one
after)
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final
KTable<K, VO> table,
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin
? LEFTJOIN_NAME : JOIN_NAME);
+
+ Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+ if (joined.gracePeriod() != null) {
Review Comment:
So we are going to make a store even when the grace period is zero. This
should make changing the grace period to and from zero smoother. So if someone
set the grace period to zero then we can evict everything from the buffer even
after the grace period is changed.
##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends
AbstractJoinIntegrationTest {
+
+ private static final String STORE_NAME = "table-store";
+
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+ private KStream<Long, String> leftStream;
+ private KTable<Long, String> rightTable;
+ private Joined<Long, String, String> joined;
+
+ public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled)
{
+ super(cacheEnabled);
+ }
+
+ @Before
+ public void prepareTopology() throws InterruptedException {
+ super.prepareEnvironment();
+ appID = "stream-table-join-integration-test";
+ builder = new StreamsBuilder();
+ joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(),
"Grace", Duration.ZERO);
Review Comment:
I'll add some, I think I need to rework what is in the integration tests and
the unit tests. Should be fixed.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
valueGetter.init(context);
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context);
+ if (buffer.isPresent()) {
+ if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+ throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
+ }
+
+ buffer.get().setSerdesIfNull(new SerdeGetter(context));
+ //cast doesn't matter, it is just because the processor is
deprecated. The context gets converted back with
StoreToProcessorContextAdapter.adapt(context)
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext)
context(), null);
+ }
}
@Override
public void process(final Record<K1, V1> record) {
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context());
+ updateObservedStreamTime(record.timestamp());
+ if (maybeDropRecord(record)) {
+ return;
+ }
+
+ if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+ doJoin(record);
+ } else {
+ final long deadline = observedStreamTime -
gracePeriod.get().toMillis();
+ if (record.timestamp() <= deadline) {
+ doJoin(record);
+ } else {
+ buffer.get().put(observedStreamTime, record,
internalProcessorContext.recordContext());
+ }
+ buffer.get().evictWhile(() -> buffer.get().minTimestamp() <=
deadline, this::emit);
Review Comment:
it should be the same. So sure I can go with the simple approach
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+ private final static KeyValueTimestamp<?, ?>[] EMPTY = new
KeyValueTimestamp[0];
+
+ private final String streamTopic = "streamTopic";
+ private final String tableTopic = "tableTopic";
+ private TestInputTopic<Integer, String> inputStreamTopic;
+ private TestInputTopic<Integer, String> inputTableTopic;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+
+ private MockApiProcessor<Integer, String, Void, Void> processor;
+ private TopologyTestDriver driver;
Review Comment:
I'm going to move the relevant tests to the integration tests and remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]