Yeah, that is some quirk of KS runtime...

There is some internal config (for perf reasons) that delays emitting results... An alternative to advancing wall-clock time would be to set this internal config to zero, to disable the delay.

Maybe we should disable this config when topology test driver is used automatically... It's not the first time it did came up.

I opened a PR for it: https://github.com/apache/kafka/pull/15660


-Matthias



On 4/3/24 3:52 PM, Chad Preisler wrote:
Changing the code to this...

assertTrue(outputTopic.isEmpty());
             testDriver.advanceWallClockTime(Duration.ofMillis(2001));
             leftTopic.pipeInput("1", "test string 3", 4002L);
             testDriver.advanceWallClockTime(Duration.ofMillis(2001));
             leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey....@gmail.com> wrote:

I believe you need to advanceWallClockTime

https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect.
I'd
appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
         var builder = new StreamsBuilder();
         var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
         var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

         leftStream.outerJoin(rightStream, (left, right) -> left + ", "
+
right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
             .to("outputTopicSeconds");

         return builder.build();
     }

Here is the test driver.

@Test
     public void testSecondsJoinDoesNotWork() {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
         var app = new KafkaStreamJoinTest();
         var serializer = new StringSerializer();

         try(var testDriver = new
TopologyTestDriver(app.buildTopology(),
                 props)) {
             var leftTopic =
testDriver.createInputTopic("leftSecondsTopic",
                     serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
             leftTopic.pipeInput("1", "test string 1", 0L);
             leftTopic.pipeInput("1", "test string 2", 2001L);

             var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
                     new StringDeserializer(), new
StringDeserializer());
             assertFalse(outputTopic.isEmpty());
             System.out.println("First join result:");
             outputTopic.readKeyValuesToList()
                 .forEach((keyValue)->
                     System.out.println("Key: " + keyValue.key + "
Value:
"
+ keyValue.value));

             assertTrue(outputTopic.isEmpty());

             leftTopic.pipeInput("1", "test string 3", 4002L);
             leftTopic.pipeInput("1", "test string 4", 6004L);

             System.out.println("Second join result:");
             outputTopic.readKeyValuesToList()
                 .forEach((keyValue)->
                     System.out.println("Key: " + keyValue.key + "
Value:
"
+ keyValue.value));

         }
     }

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?





Reply via email to