cadonna commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1243468362
########## streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java: ########## @@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) { */ @Override public Joined<K, V, VO> withName(final String name) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name); + return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + } + + /** + * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window. Review Comment: Could you please also add a couple of line breaks to the java docs as you did for the javadocs of the `with()` method? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + if (useBuffer) { + if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { + throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); + } + + buffer.get().setSerdesIfNull(new SerdeGetter(context)); + buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null); + } } @Override public void process(final Record<K1, V1> record) { + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); + updateObservedStreamTime(record.timestamp()); + if (maybeDropRecord(record)) { + return; + } + + if (!useBuffer) { + doJoin(record); + } else { + if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { + doJoin(record); + } + buffer.get().evictWhile(() -> true, this::emit); Review Comment: I think that should be the following way to avoid a unnecessary range query: ```suggestion if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { doJoin(record); } else { buffer.get().evictWhile(() -> true, this::emit); } ``` If the record is a late record it will not update the observed stream time. If the observed stream time is not updated, the range query will not return records that need to be evicted, since they have been already evicted the last time `evictWhile()` was called. Does this make sense? If you agree could also please add a test for this? ########## streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java: ########## @@ -203,4 +250,4 @@ public Serde<V> valueSerde() { public Serde<VO> otherValueSerde() { return otherValueSerde; } -} +} Review Comment: nit: Could remove this change? ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + builder = new StreamsBuilder(); + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table("tableTopic2", consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldDelayJoinByGracePeriod() { + makeJoin(Duration.ofMillis(2)); + + // push four items to the table. this should not produce any item. + pushToTableNonRandom(4, "Y"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1)); + + // push all items to the table. this should not produce any item + pushToTableNonRandom(4, "YY"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( Review Comment: Good call! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org