vcrfxia commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1239142854
########## streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java: ########## @@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) { */ @Override public Joined<K, V, VO> withName(final String name) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name); + return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + } + + /** + * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window. Review Comment: > if it is past the table history retention this could result in joins on the wrong version or a null join I think this should be "if it is past the table history retention this could result in a null join" instead? I don't think it's possible to join with the wrong version, the only possible issue is that it joins with a null instead. > even resulting in be processed out of the grace period window What does this mean? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + if (useBuffer) { + if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { + throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); + } + + buffer.get().setSerdesIfNull(new SerdeGetter(context)); + buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null); + } } @Override public void process(final Record<K1, V1> record) { + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); + updateObservedStreamTime(record.timestamp()); + if (maybeDropRecord(record)) { + return; + } + + if (!useBuffer) { + doJoin(record); + } else { + if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { + doJoin(record); + } + buffer.get().evictWhile(() -> true, this::emit); + } + } + + private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) { + final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp()) + .withHeaders(toEmit.recordContext().headers()); + internalProcessorContext.setRecordContext(toEmit.recordContext()); Review Comment: Can you unset the record context (i.e., set it back to the original context) after finishing this step, in order to avoid confusion about what context is being used downstream? That's what we do with the suppress buffer, for example: https://github.com/apache/kafka/blob/1dbcb7da9e3625ec2078a82f84542a3127730fef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java#L207-L215 ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + builder = new StreamsBuilder(); + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table("tableTopic2", consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldDelayJoinByGracePeriod() { + makeJoin(Duration.ofMillis(2)); + + // push four items to the table. this should not produce any item. + pushToTableNonRandom(4, "Y"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1)); + + // push all items to the table. this should not produce any item + pushToTableNonRandom(4, "YY"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( Review Comment: The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this step which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org