vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239142854


##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final 
Serde<VO> otherValueSerde) {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed. Out of order records in the grace period will 
be processed in timestamp order. Late records, out of the grace period, will be 
executed right as they come in, if it is past the table history retention this 
could result in joins on the wrong version or a null join. Long gaps in stream 
side arriving records will cause records to be delayed in processing, even 
resulting in be processed out of the grace period window.

Review Comment:
   > if it is past the table history retention this could result in joins on 
the wrong version or a null join
   
   I think this should be "if it is past the table history retention this could 
result in a null join" instead? I don't think it's possible to join with the 
wrong version, the only possible issue is that it joins with a null instead.
   
   > even resulting in be processed out of the grace period window
   
   What does this mean?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);
+        }
+    }
+
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) 
{
+        final Record<K1, V1> record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+            .withHeaders(toEmit.recordContext().headers());
+        internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   Can you unset the record context (i.e., set it back to the original context) 
after finishing this step, in order to avoid confusion about what context is 
being used downstream? That's what we do with the suppress buffer, for example: 
https://github.com/apache/kafka/blob/1dbcb7da9e3625ec2078a82f84542a3127730fef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java#L207-L215



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   The reason this produces two output records is because the max timestamp 
seen so far is still 3, which means only records with timestamp 0 and 1 are 
emitted (timestamps 2 and 3 are still in the buffer). Can you add another step 
to this step which now produces a record with a larger timestamp and verifies 
that the records with timestamps 2 and 3 are emitted? There should be four of 
them, and they should be emitted in timestamp order which is different from the 
offset order that they arrived in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to