I believe you need to advanceWallClockTime https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
Regards, Shashwat Pandey On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com> wrote: > Seems like there is some issue with the TopologyTestDriver. I am able to > run the same stream against Kakfa and I'm getting the output I expect. I'd > appreciate it if someone could confirm that there is an issue with the > TopologyTestDriver. If there is, any suggestions on how to test this type > of join? > > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com> > wrote: > > > Hello, > > > > I'm confused about the outerJoin and when records are produced with the > > following code. > > > > Topology buildTopology() { > > var builder = new StreamsBuilder(); > > var leftStream = builder.stream("leftSecondsTopic", > > Consumed.with(Serdes.String(), Serdes.String())); > > var rightStream = builder.stream("rightSecondsTopic", > > Consumed.with(Serdes.String(), Serdes.String())); > > > > leftStream.outerJoin(rightStream, (left, right) -> left + ", " + > > right, > > > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > > .to("outputTopicSeconds"); > > > > return builder.build(); > > } > > > > Here is the test driver. > > > > @Test > > public void testSecondsJoinDoesNotWork() { > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); > > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > > Serdes.StringSerde.class); > > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > > Serdes.StringSerde.class); > > var app = new KafkaStreamJoinTest(); > > var serializer = new StringSerializer(); > > > > try(var testDriver = new TopologyTestDriver(app.buildTopology(), > > props)) { > > var leftTopic = > > testDriver.createInputTopic("leftSecondsTopic", > > serializer, serializer, Instant.ofEpochMilli(0L), > > Duration.ZERO); > > leftTopic.pipeInput("1", "test string 1", 0L); > > leftTopic.pipeInput("1", "test string 2", 2001L); > > > > var outputTopic = > > testDriver.createOutputTopic("outputTopicSeconds", > > new StringDeserializer(), new StringDeserializer()); > > assertFalse(outputTopic.isEmpty()); > > System.out.println("First join result:"); > > outputTopic.readKeyValuesToList() > > .forEach((keyValue)-> > > System.out.println("Key: " + keyValue.key + " Value: > " > > + keyValue.value)); > > > > assertTrue(outputTopic.isEmpty()); > > > > leftTopic.pipeInput("1", "test string 3", 4002L); > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > System.out.println("Second join result:"); > > outputTopic.readKeyValuesToList() > > .forEach((keyValue)-> > > System.out.println("Key: " + keyValue.key + " Value: > " > > + keyValue.value)); > > > > } > > } > > > > Here is the output: > > First join result: > > Key: 1 Value: test string 1, null > > Second join result: > > > > I would have expected a join to happen with "test string 2" and "test > > string 3" being output with a null right value. Why didn't that happen? > > > > >