vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239143225


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   The reason this produces two output records is because the max timestamp 
seen so far is still 3, which means only records with timestamp 0 and 1 are 
emitted (timestamps 2 and 3 are still in the buffer). Can you add another step 
to this test which now produces a record with a larger timestamp and verifies 
that the records with timestamps 2 and 3 are emitted? There should be four of 
them, and they should be emitted in timestamp order which is different from the 
offset order that they arrived in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to