I believe you need to advanceWallClockTime
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

> Seems like there is some issue with the TopologyTestDriver. I am able to
> run the same stream against Kakfa and I'm getting the output I expect. I'd
> appreciate it if someone could confirm that there is an issue with the
> TopologyTestDriver. If there is, any suggestions on how to test this type
> of join?
>
> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I'm confused about the outerJoin and when records are produced with the
> > following code.
> >
> > Topology buildTopology() {
> >         var builder = new StreamsBuilder();
> >         var leftStream = builder.stream("leftSecondsTopic",
> > Consumed.with(Serdes.String(), Serdes.String()));
> >         var rightStream = builder.stream("rightSecondsTopic",
> > Consumed.with(Serdes.String(), Serdes.String()));
> >
> >         leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
> > right,
> >
> > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> >             .to("outputTopicSeconds");
> >
> >         return builder.build();
> >     }
> >
> > Here is the test driver.
> >
> > @Test
> >     public void testSecondsJoinDoesNotWork() {
> >         Properties props = new Properties();
> >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
> >         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.StringSerde.class);
> >         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > Serdes.StringSerde.class);
> >         var app = new KafkaStreamJoinTest();
> >         var serializer = new StringSerializer();
> >
> >         try(var testDriver = new TopologyTestDriver(app.buildTopology(),
> >                 props)) {
> >             var leftTopic =
> > testDriver.createInputTopic("leftSecondsTopic",
> >                     serializer, serializer, Instant.ofEpochMilli(0L),
> > Duration.ZERO);
> >             leftTopic.pipeInput("1", "test string 1", 0L);
> >             leftTopic.pipeInput("1", "test string 2", 2001L);
> >
> >             var outputTopic =
> > testDriver.createOutputTopic("outputTopicSeconds",
> >                     new StringDeserializer(), new StringDeserializer());
> >             assertFalse(outputTopic.isEmpty());
> >             System.out.println("First join result:");
> >             outputTopic.readKeyValuesToList()
> >                 .forEach((keyValue)->
> >                     System.out.println("Key: " + keyValue.key + " Value:
> "
> > + keyValue.value));
> >
> >             assertTrue(outputTopic.isEmpty());
> >
> >             leftTopic.pipeInput("1", "test string 3", 4002L);
> >             leftTopic.pipeInput("1", "test string 4", 6004L);
> >
> >             System.out.println("Second join result:");
> >             outputTopic.readKeyValuesToList()
> >                 .forEach((keyValue)->
> >                     System.out.println("Key: " + keyValue.key + " Value:
> "
> > + keyValue.value));
> >
> >         }
> >     }
> >
> > Here is the output:
> > First join result:
> > Key: 1 Value: test string 1, null
> > Second join result:
> >
> > I would have expected a join to happen with "test string 2" and "test
> > string 3" being output with a null right value. Why didn't that happen?
> >
> >
>

Reply via email to