cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1230539039


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   Shouldn't this be
   ```suggestion
           if (joined.gracePeriod() != null && joined.gracePeriod() > 0) {
   ```
   In the KIP discussion we said that a grace period of zero is equal to no 
grace period.
   
   Note, some conditions later on need to be changed if you agree with this 
change, like for example `gracePeriod.isPresent() ^ buffer.isPresent()`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   Why do you check for `gracePeriod.isPresent()` again? You have already done 
that in the constructor. 
   
   I think a boolean `useBuffer` that is set in the constructor would make the 
code more readable.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - 
gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {

Review Comment:
   Why here `record.timestamp() <= deadline` and in 
`RocksDBTimeOrderedKeyValueBuffer` at line 127 `wrapped().observedStreamTime - 
gracePeriod > record.timestamp()`?
   
   I am wondering, if it would be simpler to remove the condition in 
`RocksDBTimeOrderedKeyValueBuffer` at line 127 and only check late records 
here. In the end, you do not get any return value from the `put()` in 
`RocksDBTimeOrderedKeyValueBuffer`. So it would silently not add the record to 
the buffer. Such a behavior might become hard to debug in case of a bug and it 
would also be quite error-prone because you have the condition for late records 
in two places instead of just one.
   
   I am even in favor of let `RocksDBTimeOrderedKeyValueBuffer#put()` return 
whether the record was added to the buffer or not. In such a way, you can move 
the logic for eviction and late records to the buffer instead of having it half 
here and half there making maintaining the code harder.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private KStream<Long, String> leftStream;
+    private KTable<Long, String> rightTable;
+    private Joined<Long, String, String> joined;
+
+    public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) 
{
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+        appID = "stream-table-join-integration-test";
+        builder = new StreamsBuilder();
+        joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), 
"Grace", Duration.ZERO);

Review Comment:
   Why do you have only tests with grace period set to zero. I thought a grace 
period of zero is equivalent to no grace period. Basically, you are not testing 
the new code if you look at it from the outside.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String 
valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + 
expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) 
{
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMinutes(6))).to("out-one");
+        builder.build(props);
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce four 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1),
+            new KeyValueTimestamp<>(2, "X2+YY2", 2),
+            new KeyValueTimestamp<>(3, "X3+YY3", 3));

Review Comment:
   nit:
   ```suggestion
           processor.checkAndClearProcessResult(
               new KeyValueTimestamp<>(0, "X0+YY0", 0),
               new KeyValueTimestamp<>(1, "X1+YY1", 1),
               new KeyValueTimestamp<>(2, "X2+YY2", 2),
               new KeyValueTimestamp<>(3, "X3+YY3", 3));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {

Review Comment:
   Again, why do you verify both? In the constructor you ensure the invariant 
`gracePeriod.isPresent() == buffer.isPresent()`. A boolean `useBuffer` would 
make the code better readable.  



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   Also here, please use JUnit 5.
   I also forgot to point that out for `RocksDBTimeOrderedKeyValueBufferTest`. 
You can to open a separate PR for that.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   Could you please use JUnit 5 for new integration tests?
   You can find an example in `RestoreIntegrationTest`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - 
gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {
+                doJoin(record);
+            } else {
+                buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext());
+            }
+            buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= 
deadline, this::emit);

Review Comment:
   I am wondering if it might not be simpler to use as predicate just `() -> 
true` instead of `() -> buffer.get().minTimestamp() <= deadline` and leave the 
conditions for evicting to the buffer. Otherwise, we have half of the logic for 
evicting here and the other half in the buffer. That seems error-prone and hard 
to maintain and debug to me.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   Why do you need the topology test driver for unit tests?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String 
valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + 
expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) 
{
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMinutes(6))).to("out-one");
+        builder.build(props);
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));

Review Comment:
   nit:
   ```suggestion
           processor.checkAndClearProcessResult(
               new KeyValueTimestamp<>(0, "X0+Y0", 0),
               new KeyValueTimestamp<>(1, "X1+Y1", 1));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String 
valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + 
expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) 
{
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   I am missing assertions in this test. You should call the `init()` and 
`process()` methods directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to