Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
        var builder = new StreamsBuilder();
        var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
        var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

        leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
            .to("outputTopicSeconds");

        return builder.build();
    }

Here is the test driver.

@Test
    public void testSecondsJoinDoesNotWork() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
        var app = new KafkaStreamJoinTest();
        var serializer = new StringSerializer();

        try(var testDriver = new TopologyTestDriver(app.buildTopology(),
                props)) {
            var leftTopic = testDriver.createInputTopic("leftSecondsTopic",
                    serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
            leftTopic.pipeInput("1", "test string 1", 0L);
            leftTopic.pipeInput("1", "test string 2", 2001L);

            var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
                    new StringDeserializer(), new StringDeserializer());
            assertFalse(outputTopic.isEmpty());
            System.out.println("First join result:");
            outputTopic.readKeyValuesToList()
                .forEach((keyValue)->
                    System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));

            assertTrue(outputTopic.isEmpty());

            leftTopic.pipeInput("1", "test string 3", 4002L);
            leftTopic.pipeInput("1", "test string 4", 6004L);

            System.out.println("Second join result:");
            outputTopic.readKeyValuesToList()
                .forEach((keyValue)->
                    System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));

        }
    }

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?

Reply via email to