Changing the code to this...

assertTrue(outputTopic.isEmpty());
            testDriver.advanceWallClockTime(Duration.ofMillis(2001));
            leftTopic.pipeInput("1", "test string 3", 4002L);
            testDriver.advanceWallClockTime(Duration.ofMillis(2001));
            leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey....@gmail.com> wrote:

> I believe you need to advanceWallClockTime
>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
>
>
> Regards,
> Shashwat Pandey
>
>
> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com>
> wrote:
>
> > Seems like there is some issue with the TopologyTestDriver. I am able to
> > run the same stream against Kakfa and I'm getting the output I expect.
> I'd
> > appreciate it if someone could confirm that there is an issue with the
> > TopologyTestDriver. If there is, any suggestions on how to test this type
> > of join?
> >
> > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com>
> > wrote:
> >
> > > Hello,
> > >
> > > I'm confused about the outerJoin and when records are produced with the
> > > following code.
> > >
> > > Topology buildTopology() {
> > >         var builder = new StreamsBuilder();
> > >         var leftStream = builder.stream("leftSecondsTopic",
> > > Consumed.with(Serdes.String(), Serdes.String()));
> > >         var rightStream = builder.stream("rightSecondsTopic",
> > > Consumed.with(Serdes.String(), Serdes.String()));
> > >
> > >         leftStream.outerJoin(rightStream, (left, right) -> left + ", "
> +
> > > right,
> > >
> > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> > >             .to("outputTopicSeconds");
> > >
> > >         return builder.build();
> > >     }
> > >
> > > Here is the test driver.
> > >
> > > @Test
> > >     public void testSecondsJoinDoesNotWork() {
> > >         Properties props = new Properties();
> > >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
> > >         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > > Serdes.StringSerde.class);
> > >         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.StringSerde.class);
> > >         var app = new KafkaStreamJoinTest();
> > >         var serializer = new StringSerializer();
> > >
> > >         try(var testDriver = new
> TopologyTestDriver(app.buildTopology(),
> > >                 props)) {
> > >             var leftTopic =
> > > testDriver.createInputTopic("leftSecondsTopic",
> > >                     serializer, serializer, Instant.ofEpochMilli(0L),
> > > Duration.ZERO);
> > >             leftTopic.pipeInput("1", "test string 1", 0L);
> > >             leftTopic.pipeInput("1", "test string 2", 2001L);
> > >
> > >             var outputTopic =
> > > testDriver.createOutputTopic("outputTopicSeconds",
> > >                     new StringDeserializer(), new
> StringDeserializer());
> > >             assertFalse(outputTopic.isEmpty());
> > >             System.out.println("First join result:");
> > >             outputTopic.readKeyValuesToList()
> > >                 .forEach((keyValue)->
> > >                     System.out.println("Key: " + keyValue.key + "
> Value:
> > "
> > > + keyValue.value));
> > >
> > >             assertTrue(outputTopic.isEmpty());
> > >
> > >             leftTopic.pipeInput("1", "test string 3", 4002L);
> > >             leftTopic.pipeInput("1", "test string 4", 6004L);
> > >
> > >             System.out.println("Second join result:");
> > >             outputTopic.readKeyValuesToList()
> > >                 .forEach((keyValue)->
> > >                     System.out.println("Key: " + keyValue.key + "
> Value:
> > "
> > > + keyValue.value));
> > >
> > >         }
> > >     }
> > >
> > > Here is the output:
> > > First join result:
> > > Key: 1 Value: test string 1, null
> > > Second join result:
> > >
> > > I would have expected a join to happen with "test string 2" and "test
> > > string 3" being output with a null right value. Why didn't that happen?
> > >
> > >
> >
>

Reply via email to