I was able to get my test to complete correctly setting the internal setting and removing the calls to set the wall clock.
props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); Thanks Shashwat and Matthias for giving me two solutions. On Thu, Apr 4, 2024 at 12:23 PM Matthias J. Sax <mj...@apache.org> wrote: > Yeah, that is some quirk of KS runtime... > > There is some internal config (for perf reasons) that delays emitting > results... An alternative to advancing wall-clock time would be to set > this internal config to zero, to disable the delay. > > Maybe we should disable this config when topology test driver is used > automatically... It's not the first time it did came up. > > I opened a PR for it: https://github.com/apache/kafka/pull/15660 > > > -Matthias > > > > On 4/3/24 3:52 PM, Chad Preisler wrote: > > Changing the code to this... > > > > assertTrue(outputTopic.isEmpty()); > > testDriver.advanceWallClockTime(Duration.ofMillis(2001)); > > leftTopic.pipeInput("1", "test string 3", 4002L); > > testDriver.advanceWallClockTime(Duration.ofMillis(2001)); > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > Did appear to fix the issue. Output: > > > > First join result: > > Key: 1 Value: test string 1, null > > Second join result: > > Key: 1 Value: test string 2, null > > Key: 1 Value: test string 3, null > > > > Still a little strange that it works the first time without advancing the > > wall clock. > > > > On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey < > > shashwat.pandey....@gmail.com> wrote: > > > >> I believe you need to advanceWallClockTime > >> > >> > https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- > >> > >> > >> Regards, > >> Shashwat Pandey > >> > >> > >> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com> > >> wrote: > >> > >>> Seems like there is some issue with the TopologyTestDriver. I am able > to > >>> run the same stream against Kakfa and I'm getting the output I expect. > >> I'd > >>> appreciate it if someone could confirm that there is an issue with the > >>> TopologyTestDriver. If there is, any suggestions on how to test this > type > >>> of join? > >>> > >>> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com> > >>> wrote: > >>> > >>>> Hello, > >>>> > >>>> I'm confused about the outerJoin and when records are produced with > the > >>>> following code. > >>>> > >>>> Topology buildTopology() { > >>>> var builder = new StreamsBuilder(); > >>>> var leftStream = builder.stream("leftSecondsTopic", > >>>> Consumed.with(Serdes.String(), Serdes.String())); > >>>> var rightStream = builder.stream("rightSecondsTopic", > >>>> Consumed.with(Serdes.String(), Serdes.String())); > >>>> > >>>> leftStream.outerJoin(rightStream, (left, right) -> left + ", > " > >> + > >>>> right, > >>>> > >>>> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > >>>> .to("outputTopicSeconds"); > >>>> > >>>> return builder.build(); > >>>> } > >>>> > >>>> Here is the test driver. > >>>> > >>>> @Test > >>>> public void testSecondsJoinDoesNotWork() { > >>>> Properties props = new Properties(); > >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "testSeconds"); > >>>> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > >>>> Serdes.StringSerde.class); > >>>> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > >>>> Serdes.StringSerde.class); > >>>> var app = new KafkaStreamJoinTest(); > >>>> var serializer = new StringSerializer(); > >>>> > >>>> try(var testDriver = new > >> TopologyTestDriver(app.buildTopology(), > >>>> props)) { > >>>> var leftTopic = > >>>> testDriver.createInputTopic("leftSecondsTopic", > >>>> serializer, serializer, Instant.ofEpochMilli(0L), > >>>> Duration.ZERO); > >>>> leftTopic.pipeInput("1", "test string 1", 0L); > >>>> leftTopic.pipeInput("1", "test string 2", 2001L); > >>>> > >>>> var outputTopic = > >>>> testDriver.createOutputTopic("outputTopicSeconds", > >>>> new StringDeserializer(), new > >> StringDeserializer()); > >>>> assertFalse(outputTopic.isEmpty()); > >>>> System.out.println("First join result:"); > >>>> outputTopic.readKeyValuesToList() > >>>> .forEach((keyValue)-> > >>>> System.out.println("Key: " + keyValue.key + " > >> Value: > >>> " > >>>> + keyValue.value)); > >>>> > >>>> assertTrue(outputTopic.isEmpty()); > >>>> > >>>> leftTopic.pipeInput("1", "test string 3", 4002L); > >>>> leftTopic.pipeInput("1", "test string 4", 6004L); > >>>> > >>>> System.out.println("Second join result:"); > >>>> outputTopic.readKeyValuesToList() > >>>> .forEach((keyValue)-> > >>>> System.out.println("Key: " + keyValue.key + " > >> Value: > >>> " > >>>> + keyValue.value)); > >>>> > >>>> } > >>>> } > >>>> > >>>> Here is the output: > >>>> First join result: > >>>> Key: 1 Value: test string 1, null > >>>> Second join result: > >>>> > >>>> I would have expected a join to happen with "test string 2" and "test > >>>> string 3" being output with a null right value. Why didn't that > happen? > >>>> > >>>> > >>> > >> > > >