Guido Josquin created KAFKA-14173:
-------------------------------------
Summary: TopologyTestDriver does not perform left join on two
streams when right side is missing
Key: KAFKA-14173
URL: https://issues.apache.org/jira/browse/KAFKA-14173
Project: Kafka
Issue Type: Bug
Components: streams-test-utils
Affects Versions: 2.3.1
Reporter: Guido Josquin
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is
to confirm, without running external services, that my topology performs the
following left join correctly.
```
bills
.leftJoin(payments)(
{
case (billValue, null) => billValue
case (billValue, paymentValue) => (billValue.toInt -
paymentValue.toInt).toString
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
)
.to("debt")
```
In other words, if we see a `bill` and a `payment` within 100ms, the payment
should be subtracted from the bill. If we do not see a payment, the debt is
simply the bill.
Here is the test code.
```
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde
val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer,
serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer,
serde.deserializer)
bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")
// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)
val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
// This record is present
new KeyValue[String, String]("fred", "5"),
// This record is missing
new KeyValue[String, String]("george", "20")
)
```
Full code available at https://github.com/Oduig/kstreams-left-join-example
Is seems that advancing the wall clock time, or sleeping for that matter, never
triggers the join condition when data only arrives on the left side.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)