I was able to get my test to complete correctly setting the internal
setting and removing the calls to set the wall clock.

props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
0L);

Thanks Shashwat and Matthias for giving me two solutions.


On Thu, Apr 4, 2024 at 12:23 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yeah, that is some quirk of KS runtime...
>
> There is some internal config (for perf reasons) that delays emitting
> results... An alternative to advancing wall-clock time would be to set
> this internal config to zero, to disable the delay.
>
> Maybe we should disable this config when topology test driver is used
> automatically... It's not the first time it did came up.
>
> I opened a PR for it: https://github.com/apache/kafka/pull/15660
>
>
> -Matthias
>
>
>
> On 4/3/24 3:52 PM, Chad Preisler wrote:
> > Changing the code to this...
> >
> > assertTrue(outputTopic.isEmpty());
> >              testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >              leftTopic.pipeInput("1", "test string 3", 4002L);
> >              testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >              leftTopic.pipeInput("1", "test string 4", 6004L);
> >
> > Did appear to fix the issue. Output:
> >
> > First join result:
> > Key: 1 Value: test string 1, null
> > Second join result:
> > Key: 1 Value: test string 2, null
> > Key: 1 Value: test string 3, null
> >
> > Still a little strange that it works the first time without advancing the
> > wall clock.
> >
> > On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
> > shashwat.pandey....@gmail.com> wrote:
> >
> >> I believe you need to advanceWallClockTime
> >>
> >>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
> >>
> >>
> >> Regards,
> >> Shashwat Pandey
> >>
> >>
> >> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler <chad.preis...@gmail.com>
> >> wrote:
> >>
> >>> Seems like there is some issue with the TopologyTestDriver. I am able
> to
> >>> run the same stream against Kakfa and I'm getting the output I expect.
> >> I'd
> >>> appreciate it if someone could confirm that there is an issue with the
> >>> TopologyTestDriver. If there is, any suggestions on how to test this
> type
> >>> of join?
> >>>
> >>> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler <chad.preis...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> I'm confused about the outerJoin and when records are produced with
> the
> >>>> following code.
> >>>>
> >>>> Topology buildTopology() {
> >>>>          var builder = new StreamsBuilder();
> >>>>          var leftStream = builder.stream("leftSecondsTopic",
> >>>> Consumed.with(Serdes.String(), Serdes.String()));
> >>>>          var rightStream = builder.stream("rightSecondsTopic",
> >>>> Consumed.with(Serdes.String(), Serdes.String()));
> >>>>
> >>>>          leftStream.outerJoin(rightStream, (left, right) -> left + ",
> "
> >> +
> >>>> right,
> >>>>
> >>>> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> >>>>              .to("outputTopicSeconds");
> >>>>
> >>>>          return builder.build();
> >>>>      }
> >>>>
> >>>> Here is the test driver.
> >>>>
> >>>> @Test
> >>>>      public void testSecondsJoinDoesNotWork() {
> >>>>          Properties props = new Properties();
> >>>>          props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "testSeconds");
> >>>>          props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> >>>> Serdes.StringSerde.class);
> >>>>          props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> >>>> Serdes.StringSerde.class);
> >>>>          var app = new KafkaStreamJoinTest();
> >>>>          var serializer = new StringSerializer();
> >>>>
> >>>>          try(var testDriver = new
> >> TopologyTestDriver(app.buildTopology(),
> >>>>                  props)) {
> >>>>              var leftTopic =
> >>>> testDriver.createInputTopic("leftSecondsTopic",
> >>>>                      serializer, serializer, Instant.ofEpochMilli(0L),
> >>>> Duration.ZERO);
> >>>>              leftTopic.pipeInput("1", "test string 1", 0L);
> >>>>              leftTopic.pipeInput("1", "test string 2", 2001L);
> >>>>
> >>>>              var outputTopic =
> >>>> testDriver.createOutputTopic("outputTopicSeconds",
> >>>>                      new StringDeserializer(), new
> >> StringDeserializer());
> >>>>              assertFalse(outputTopic.isEmpty());
> >>>>              System.out.println("First join result:");
> >>>>              outputTopic.readKeyValuesToList()
> >>>>                  .forEach((keyValue)->
> >>>>                      System.out.println("Key: " + keyValue.key + "
> >> Value:
> >>> "
> >>>> + keyValue.value));
> >>>>
> >>>>              assertTrue(outputTopic.isEmpty());
> >>>>
> >>>>              leftTopic.pipeInput("1", "test string 3", 4002L);
> >>>>              leftTopic.pipeInput("1", "test string 4", 6004L);
> >>>>
> >>>>              System.out.println("Second join result:");
> >>>>              outputTopic.readKeyValuesToList()
> >>>>                  .forEach((keyValue)->
> >>>>                      System.out.println("Key: " + keyValue.key + "
> >> Value:
> >>> "
> >>>> + keyValue.value));
> >>>>
> >>>>          }
> >>>>      }
> >>>>
> >>>> Here is the output:
> >>>> First join result:
> >>>> Key: 1 Value: test string 1, null
> >>>> Second join result:
> >>>>
> >>>> I would have expected a join to happen with "test string 2" and "test
> >>>> string 3" being output with a null right value. Why didn't that
> happen?
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to