cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1243468362


##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final 
Serde<VO> otherValueSerde) {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed. Out of order records in the grace period will 
be processed in timestamp order. Late records, out of the grace period, will be 
executed right as they come in, if it is past the table history retention this 
could result in joins on the wrong version or a null join. Long gaps in stream 
side arriving records will cause records to be delayed in processing, even 
resulting in be processed out of the grace period window.

Review Comment:
   Could you please also add a couple of line breaks to the java docs as you 
did for the javadocs of the `with()` method?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   I think that should be the following way to avoid a unnecessary range query:
   ```suggestion
               if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
                   doJoin(record);
               } else {
                   buffer.get().evictWhile(() -> true, this::emit);
               }
   ```
   If the record is a late record it will not update the observed stream time. 
If the observed stream time is not updated, the range query will not return 
records that need to be evicted, since they have been already evicted the last 
time `evictWhile()` was called.
   Does this make sense?
   
   If you agree could also please add a test for this?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -203,4 +250,4 @@ public Serde<V> valueSerde() {
     public Serde<VO> otherValueSerde() {
         return otherValueSerde;
     }
-}
+}

Review Comment:
   nit: Could remove this change? 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   Good call!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to