wcarlson5 commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1244739834
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + if (useBuffer) { + if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { + throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); + } + + buffer.get().setSerdesIfNull(new SerdeGetter(context)); + buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null); + } } @Override public void process(final Record<K1, V1> record) { + internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); + updateObservedStreamTime(record.timestamp()); + if (maybeDropRecord(record)) { + return; + } + + if (!useBuffer) { + doJoin(record); + } else { + if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) { + doJoin(record); + } + buffer.get().evictWhile(() -> true, this::emit); Review Comment: Sure that is just fine with me. I'll add a test adding records out of the grace period to ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -112,6 +125,74 @@ private void pushNullValueToTable() { } } + + private void makeJoin(final Duration grace) { + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + builder = new StreamsBuilder(); + + final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + stream = builder.stream(streamTopic, consumed); + table = builder.table("tableTopic2", consumed, Materialized.as( + Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5)))); + stream.join(table, + MockValueJoiner.TOSTRING_JOINER, + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace) + ).process(supplier); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + driver = new TopologyTestDriver(builder.build(), props); + inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + + processor = supplier.theCapturedProcessor(); + } + + @Test + public void shouldFailIfTableIsNotVersioned() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one")); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldDelayJoinByGracePeriod() { + makeJoin(Duration.ofMillis(2)); + + // push four items to the table. this should not produce any item. + pushToTableNonRandom(4, "Y"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1)); + + // push all items to the table. this should not produce any item + pushToTableNonRandom(4, "YY"); + processor.checkAndClearProcessResult(EMPTY); + + // push all four items to the primary stream. this should produce two items. + pushToStream(4, "X"); + processor.checkAndClearProcessResult( Review Comment: I added something to flush the buffer at the end of this test. Works just fine :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org