Guido Josquin created KAFKA-14173: ------------------------------------- Summary: TopologyTestDriver does not perform left join on two streams when right side is missing Key: KAFKA-14173 URL: https://issues.apache.org/jira/browse/KAFKA-14173 Project: Kafka Issue Type: Bug Components: streams-test-utils Affects Versions: 2.3.1 Reporter: Guido Josquin
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is to confirm, without running external services, that my topology performs the following left join correctly. ``` bills .leftJoin(payments)( { case (billValue, null) => billValue case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) ) .to("debt") ``` In other words, if we see a `bill` and a `payment` within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill. Here is the test code. ``` val simpleLeftJoinTopology = new SimpleLeftJoinTopology val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) val serde = Serdes.stringSerde val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer) val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer) val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer) bills.pipeInput("fred", "100") bills.pipeInput("george", "20") payments.pipeInput("fred", "95") // When in doubt, sleep twice driver.advanceWallClockTime(Duration.ofMillis(500)) Thread.sleep(500) val keyValues = debt.readKeyValuesToList() keyValues should contain theSameElementsAs Seq( // This record is present new KeyValue[String, String]("fred", "5"), // This record is missing new KeyValue[String, String]("george", "20") ) ``` Full code available at https://github.com/Oduig/kstreams-left-join-example Is seems that advancing the wall clock time, or sleeping for that matter, never triggers the join condition when data only arrives on the left side. -- This message was sent by Atlassian Jira (v8.20.10#820010)