Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect. I'd
appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

> Hello,
>
> I'm confused about the outerJoin and when records are produced with the
> following code.
>
> Topology buildTopology() {
>         var builder = new StreamsBuilder();
>         var leftStream = builder.stream("leftSecondsTopic",
> Consumed.with(Serdes.String(), Serdes.String()));
>         var rightStream = builder.stream("rightSecondsTopic",
> Consumed.with(Serdes.String(), Serdes.String()));
>
>         leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
> right,
>
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
>             .to("outputTopicSeconds");
>
>         return builder.build();
>     }
>
> Here is the test driver.
>
> @Test
>     public void testSecondsJoinDoesNotWork() {
>         Properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
>         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
>         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
>         var app = new KafkaStreamJoinTest();
>         var serializer = new StringSerializer();
>
>         try(var testDriver = new TopologyTestDriver(app.buildTopology(),
>                 props)) {
>             var leftTopic =
> testDriver.createInputTopic("leftSecondsTopic",
>                     serializer, serializer, Instant.ofEpochMilli(0L),
> Duration.ZERO);
>             leftTopic.pipeInput("1", "test string 1", 0L);
>             leftTopic.pipeInput("1", "test string 2", 2001L);
>
>             var outputTopic =
> testDriver.createOutputTopic("outputTopicSeconds",
>                     new StringDeserializer(), new StringDeserializer());
>             assertFalse(outputTopic.isEmpty());
>             System.out.println("First join result:");
>             outputTopic.readKeyValuesToList()
>                 .forEach((keyValue)->
>                     System.out.println("Key: " + keyValue.key + " Value: "
> + keyValue.value));
>
>             assertTrue(outputTopic.isEmpty());
>
>             leftTopic.pipeInput("1", "test string 3", 4002L);
>             leftTopic.pipeInput("1", "test string 4", 6004L);
>
>             System.out.println("Second join result:");
>             outputTopic.readKeyValuesToList()
>                 .forEach((keyValue)->
>                     System.out.println("Key: " + keyValue.key + " Value: "
> + keyValue.value));
>
>         }
>     }
>
> Here is the output:
> First join result:
> Key: 1 Value: test string 1, null
> Second join result:
>
> I would have expected a join to happen with "test string 2" and "test
> string 3" being output with a null right value. Why didn't that happen?
>
>

Reply via email to