Changing the code to this... assertTrue(outputTopic.isEmpty()); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 3", 4002L); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 4", 6004L);
Did appear to fix the issue. Output: First join result: Key: 1 Value: test string 1, null Second join result: Key: 1 Value: test string 2, null Key: 1 Value: test string 3, null Still a little strange that it works the first time without advancing the wall clock. On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey < shashwat.pandey....@gmail.com> wrote: > I believe you need to advanceWallClockTime > > https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- > > > Regards, > Shashwat Pandey > > > On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com> > wrote: > > > Seems like there is some issue with the TopologyTestDriver. I am able to > > run the same stream against Kakfa and I'm getting the output I expect. > I'd > > appreciate it if someone could confirm that there is an issue with the > > TopologyTestDriver. If there is, any suggestions on how to test this type > > of join? > > > > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com> > > wrote: > > > > > Hello, > > > > > > I'm confused about the outerJoin and when records are produced with the > > > following code. > > > > > > Topology buildTopology() { > > > var builder = new StreamsBuilder(); > > > var leftStream = builder.stream("leftSecondsTopic", > > > Consumed.with(Serdes.String(), Serdes.String())); > > > var rightStream = builder.stream("rightSecondsTopic", > > > Consumed.with(Serdes.String(), Serdes.String())); > > > > > > leftStream.outerJoin(rightStream, (left, right) -> left + ", " > + > > > right, > > > > > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > > > .to("outputTopicSeconds"); > > > > > > return builder.build(); > > > } > > > > > > Here is the test driver. > > > > > > @Test > > > public void testSecondsJoinDoesNotWork() { > > > Properties props = new Properties(); > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); > > > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > > > Serdes.StringSerde.class); > > > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > > > Serdes.StringSerde.class); > > > var app = new KafkaStreamJoinTest(); > > > var serializer = new StringSerializer(); > > > > > > try(var testDriver = new > TopologyTestDriver(app.buildTopology(), > > > props)) { > > > var leftTopic = > > > testDriver.createInputTopic("leftSecondsTopic", > > > serializer, serializer, Instant.ofEpochMilli(0L), > > > Duration.ZERO); > > > leftTopic.pipeInput("1", "test string 1", 0L); > > > leftTopic.pipeInput("1", "test string 2", 2001L); > > > > > > var outputTopic = > > > testDriver.createOutputTopic("outputTopicSeconds", > > > new StringDeserializer(), new > StringDeserializer()); > > > assertFalse(outputTopic.isEmpty()); > > > System.out.println("First join result:"); > > > outputTopic.readKeyValuesToList() > > > .forEach((keyValue)-> > > > System.out.println("Key: " + keyValue.key + " > Value: > > " > > > + keyValue.value)); > > > > > > assertTrue(outputTopic.isEmpty()); > > > > > > leftTopic.pipeInput("1", "test string 3", 4002L); > > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > > > System.out.println("Second join result:"); > > > outputTopic.readKeyValuesToList() > > > .forEach((keyValue)-> > > > System.out.println("Key: " + keyValue.key + " > Value: > > " > > > + keyValue.value)); > > > > > > } > > > } > > > > > > Here is the output: > > > First join result: > > > Key: 1 Value: test string 1, null > > > Second join result: > > > > > > I would have expected a join to happen with "test string 2" and "test > > > string 3" being output with a null right value. Why didn't that happen? > > > > > > > > >