[ https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725356#comment-17725356 ]
Matthias J. Sax commented on KAFKA-14173: ----------------------------------------- Just discovering this ticket. I guess you would need to use `TestInputTopic#advanceTime` for this case? Closing the ticket as "no an issue", as the API is there. Feel free to follow up. > TopologyTestDriver does not use mock wall clock time when sending test records > ------------------------------------------------------------------------------ > > Key: KAFKA-14173 > URL: https://issues.apache.org/jira/browse/KAFKA-14173 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Affects Versions: 2.3.1 > Reporter: Guido Josquin > Priority: Minor > > I am trying to test a stream-stream join with `TopologyTestDriver`. My goal > is to confirm that my topology performs the following left join correctly. > {code:java} > bills > .leftJoin(payments)( > { > case (billValue, null) => billValue > case (billValue, paymentValue) => (billValue.toInt - > paymentValue.toInt).toString > }, > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) > ) > .to("debt") > {code} > > In other words, if we see a `bill` and a `payment` within 100ms, the payment > should be subtracted from the bill. If we do not see a payment, the debt is > simply the bill. > Here is the test code. > {code:java} > val simpleLeftJoinTopology = new SimpleLeftJoinTopology > val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) > val serde = Serdes.stringSerde > val bills = driver.createInputTopic("bills", serde.serializer, > serde.serializer) > val payments = driver.createInputTopic("payments", serde.serializer, > serde.serializer) > val debt = driver.createOutputTopic("debt", serde.deserializer, > serde.deserializer) > bills.pipeInput("fred", "100") > bills.pipeInput("george", "20") > payments.pipeInput("fred", "95") > // When in doubt, sleep twice > driver.advanceWallClockTime(Duration.ofMillis(500)) > Thread.sleep(500) > // Send a new record to cause the previous window to be closed > payments.pipeInput("percy", "0") > val keyValues = debt.readKeyValuesToList() > keyValues should contain theSameElementsAs Seq( > // This record is present > new KeyValue[String, String]("fred", "5"), > // This record is missing > new KeyValue[String, String]("george", "20") > ) > {code} > Full code available at [https://github.com/Oduig/kstreams-left-join-example] > Is seems that advancing the wall clock time, sleeping, or sending an extra > record, never triggers the join condition when data only arrives on the left > side. It is possible to circumvent this by passing an explicit event time > with each test record. (See > https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161) > > However, the behavior deviates from a real Kafka broker. With a real broker, > if we do not send an event, it uses the wall clock time of the broker > instead. The behavior under test should be the same: > `driver.advanceWallClockTime` should provide the default time to be used for > `TestTopic.pipeInput`, when no other time is specified. -- This message was sent by Atlassian Jira (v8.20.10#820010)