Re: outerJoin confusion
I was able to get my test to complete correctly setting the internal setting and removing the calls to set the wall clock. props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); Thanks Shashwat and Matthias for giving me two solutions. On Thu, Apr 4, 2024 at 12:23 PM Matthias J. Sax wrote: > Yeah, that is some quirk of KS runtime... > > There is some internal config (for perf reasons) that delays emitting > results... An alternative to advancing wall-clock time would be to set > this internal config to zero, to disable the delay. > > Maybe we should disable this config when topology test driver is used > automatically... It's not the first time it did came up. > > I opened a PR for it: https://github.com/apache/kafka/pull/15660 > > > -Matthias > > > > On 4/3/24 3:52 PM, Chad Preisler wrote: > > Changing the code to this... > > > > assertTrue(outputTopic.isEmpty()); > > testDriver.advanceWallClockTime(Duration.ofMillis(2001)); > > leftTopic.pipeInput("1", "test string 3", 4002L); > > testDriver.advanceWallClockTime(Duration.ofMillis(2001)); > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > Did appear to fix the issue. Output: > > > > First join result: > > Key: 1 Value: test string 1, null > > Second join result: > > Key: 1 Value: test string 2, null > > Key: 1 Value: test string 3, null > > > > Still a little strange that it works the first time without advancing the > > wall clock. > > > > On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey < > > shashwat.pandey@gmail.com> wrote: > > > >> I believe you need to advanceWallClockTime > >> > >> > https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- > >> > >> > >> Regards, > >> Shashwat Pandey > >> > >> > >> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler > >> wrote: > >> > >>> Seems like there is some issue with the TopologyTestDriver. I am able > to > >>> run the same stream against Kakfa and I'm getting the output I expect. > >> I'd > >>> appreciate it if someone could confirm that there is an issue with the > >>> TopologyTestDriver. If there is, any suggestions on how to test this > type > >>> of join? > >>> > >>> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler > >>> wrote: > >>> > Hello, > > I'm confused about the outerJoin and when records are produced with > the > following code. > > Topology buildTopology() { > var builder = new StreamsBuilder(); > var leftStream = builder.stream("leftSecondsTopic", > Consumed.with(Serdes.String(), Serdes.String())); > var rightStream = builder.stream("rightSecondsTopic", > Consumed.with(Serdes.String(), Serdes.String())); > > leftStream.outerJoin(rightStream, (left, right) -> left + ", > " > >> + > right, > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > .to("outputTopicSeconds"); > > return builder.build(); > } > > Here is the test driver. > > @Test > public void testSecondsJoinDoesNotWork() { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "testSeconds"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > var app = new KafkaStreamJoinTest(); > var serializer = new StringSerializer(); > > try(var testDriver = new > >> TopologyTestDriver(app.buildTopology(), > props)) { > var leftTopic = > testDriver.createInputTopic("leftSecondsTopic", > serializer, serializer, Instant.ofEpochMilli(0L), > Duration.ZERO); > leftTopic.pipeInput("1", "test string 1", 0L); > leftTopic.pipeInput("1", "test string 2", 2001L); > > var outputTopic = > testDriver.createOutputTopic("outputTopicSeconds", > new StringDeserializer(), new > >> StringDeserializer()); > assertFalse(outputTopic.isEmpty()); > System.out.println("First join result:"); > outputTopic.readKeyValuesToList() > .forEach((keyValue)-> > System.out.println("Key: " + keyValue.key + " > >> Value: > >>> " > + keyValue.value)); > > assertTrue(outputTopic.isEmpty()); > > leftTopic.pipeInput("1", "test string 3", 4002L); > leftTopic.pipeInput("1", "test string 4", 6004L); > > System.out.println("Second join result:"); > outputTopic.readKeyValuesToList() >
Re: outerJoin confusion
Yeah, that is some quirk of KS runtime... There is some internal config (for perf reasons) that delays emitting results... An alternative to advancing wall-clock time would be to set this internal config to zero, to disable the delay. Maybe we should disable this config when topology test driver is used automatically... It's not the first time it did came up. I opened a PR for it: https://github.com/apache/kafka/pull/15660 -Matthias On 4/3/24 3:52 PM, Chad Preisler wrote: Changing the code to this... assertTrue(outputTopic.isEmpty()); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 3", 4002L); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 4", 6004L); Did appear to fix the issue. Output: First join result: Key: 1 Value: test string 1, null Second join result: Key: 1 Value: test string 2, null Key: 1 Value: test string 3, null Still a little strange that it works the first time without advancing the wall clock. On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey < shashwat.pandey@gmail.com> wrote: I believe you need to advanceWallClockTime https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- Regards, Shashwat Pandey On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler wrote: Seems like there is some issue with the TopologyTestDriver. I am able to run the same stream against Kakfa and I'm getting the output I expect. I'd appreciate it if someone could confirm that there is an issue with the TopologyTestDriver. If there is, any suggestions on how to test this type of join? On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler wrote: Hello, I'm confused about the outerJoin and when records are produced with the following code. Topology buildTopology() { var builder = new StreamsBuilder(); var leftStream = builder.stream("leftSecondsTopic", Consumed.with(Serdes.String(), Serdes.String())); var rightStream = builder.stream("rightSecondsTopic", Consumed.with(Serdes.String(), Serdes.String())); leftStream.outerJoin(rightStream, (left, right) -> left + ", " + right, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) .to("outputTopicSeconds"); return builder.build(); } Here is the test driver. @Test public void testSecondsJoinDoesNotWork() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); var app = new KafkaStreamJoinTest(); var serializer = new StringSerializer(); try(var testDriver = new TopologyTestDriver(app.buildTopology(), props)) { var leftTopic = testDriver.createInputTopic("leftSecondsTopic", serializer, serializer, Instant.ofEpochMilli(0L), Duration.ZERO); leftTopic.pipeInput("1", "test string 1", 0L); leftTopic.pipeInput("1", "test string 2", 2001L); var outputTopic = testDriver.createOutputTopic("outputTopicSeconds", new StringDeserializer(), new StringDeserializer()); assertFalse(outputTopic.isEmpty()); System.out.println("First join result:"); outputTopic.readKeyValuesToList() .forEach((keyValue)-> System.out.println("Key: " + keyValue.key + " Value: " + keyValue.value)); assertTrue(outputTopic.isEmpty()); leftTopic.pipeInput("1", "test string 3", 4002L); leftTopic.pipeInput("1", "test string 4", 6004L); System.out.println("Second join result:"); outputTopic.readKeyValuesToList() .forEach((keyValue)-> System.out.println("Key: " + keyValue.key + " Value: " + keyValue.value)); } } Here is the output: First join result: Key: 1 Value: test string 1, null Second join result: I would have expected a join to happen with "test string 2" and "test string 3" being output with a null right value. Why didn't that happen?
Re: outerJoin confusion
Changing the code to this... assertTrue(outputTopic.isEmpty()); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 3", 4002L); testDriver.advanceWallClockTime(Duration.ofMillis(2001)); leftTopic.pipeInput("1", "test string 4", 6004L); Did appear to fix the issue. Output: First join result: Key: 1 Value: test string 1, null Second join result: Key: 1 Value: test string 2, null Key: 1 Value: test string 3, null Still a little strange that it works the first time without advancing the wall clock. On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey < shashwat.pandey@gmail.com> wrote: > I believe you need to advanceWallClockTime > > https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- > > > Regards, > Shashwat Pandey > > > On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler > wrote: > > > Seems like there is some issue with the TopologyTestDriver. I am able to > > run the same stream against Kakfa and I'm getting the output I expect. > I'd > > appreciate it if someone could confirm that there is an issue with the > > TopologyTestDriver. If there is, any suggestions on how to test this type > > of join? > > > > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler > > wrote: > > > > > Hello, > > > > > > I'm confused about the outerJoin and when records are produced with the > > > following code. > > > > > > Topology buildTopology() { > > > var builder = new StreamsBuilder(); > > > var leftStream = builder.stream("leftSecondsTopic", > > > Consumed.with(Serdes.String(), Serdes.String())); > > > var rightStream = builder.stream("rightSecondsTopic", > > > Consumed.with(Serdes.String(), Serdes.String())); > > > > > > leftStream.outerJoin(rightStream, (left, right) -> left + ", " > + > > > right, > > > > > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > > > .to("outputTopicSeconds"); > > > > > > return builder.build(); > > > } > > > > > > Here is the test driver. > > > > > > @Test > > > public void testSecondsJoinDoesNotWork() { > > > Properties props = new Properties(); > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); > > > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > > > Serdes.StringSerde.class); > > > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > > > Serdes.StringSerde.class); > > > var app = new KafkaStreamJoinTest(); > > > var serializer = new StringSerializer(); > > > > > > try(var testDriver = new > TopologyTestDriver(app.buildTopology(), > > > props)) { > > > var leftTopic = > > > testDriver.createInputTopic("leftSecondsTopic", > > > serializer, serializer, Instant.ofEpochMilli(0L), > > > Duration.ZERO); > > > leftTopic.pipeInput("1", "test string 1", 0L); > > > leftTopic.pipeInput("1", "test string 2", 2001L); > > > > > > var outputTopic = > > > testDriver.createOutputTopic("outputTopicSeconds", > > > new StringDeserializer(), new > StringDeserializer()); > > > assertFalse(outputTopic.isEmpty()); > > > System.out.println("First join result:"); > > > outputTopic.readKeyValuesToList() > > > .forEach((keyValue)-> > > > System.out.println("Key: " + keyValue.key + " > Value: > > " > > > + keyValue.value)); > > > > > > assertTrue(outputTopic.isEmpty()); > > > > > > leftTopic.pipeInput("1", "test string 3", 4002L); > > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > > > System.out.println("Second join result:"); > > > outputTopic.readKeyValuesToList() > > > .forEach((keyValue)-> > > > System.out.println("Key: " + keyValue.key + " > Value: > > " > > > + keyValue.value)); > > > > > > } > > > } > > > > > > Here is the output: > > > First join result: > > > Key: 1 Value: test string 1, null > > > Second join result: > > > > > > I would have expected a join to happen with "test string 2" and "test > > > string 3" being output with a null right value. Why didn't that happen? > > > > > > > > >
Re: outerJoin confusion
I believe you need to advanceWallClockTime https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration- Regards, Shashwat Pandey On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler wrote: > Seems like there is some issue with the TopologyTestDriver. I am able to > run the same stream against Kakfa and I'm getting the output I expect. I'd > appreciate it if someone could confirm that there is an issue with the > TopologyTestDriver. If there is, any suggestions on how to test this type > of join? > > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler > wrote: > > > Hello, > > > > I'm confused about the outerJoin and when records are produced with the > > following code. > > > > Topology buildTopology() { > > var builder = new StreamsBuilder(); > > var leftStream = builder.stream("leftSecondsTopic", > > Consumed.with(Serdes.String(), Serdes.String())); > > var rightStream = builder.stream("rightSecondsTopic", > > Consumed.with(Serdes.String(), Serdes.String())); > > > > leftStream.outerJoin(rightStream, (left, right) -> left + ", " + > > right, > > > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > > .to("outputTopicSeconds"); > > > > return builder.build(); > > } > > > > Here is the test driver. > > > > @Test > > public void testSecondsJoinDoesNotWork() { > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); > > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > > Serdes.StringSerde.class); > > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > > Serdes.StringSerde.class); > > var app = new KafkaStreamJoinTest(); > > var serializer = new StringSerializer(); > > > > try(var testDriver = new TopologyTestDriver(app.buildTopology(), > > props)) { > > var leftTopic = > > testDriver.createInputTopic("leftSecondsTopic", > > serializer, serializer, Instant.ofEpochMilli(0L), > > Duration.ZERO); > > leftTopic.pipeInput("1", "test string 1", 0L); > > leftTopic.pipeInput("1", "test string 2", 2001L); > > > > var outputTopic = > > testDriver.createOutputTopic("outputTopicSeconds", > > new StringDeserializer(), new StringDeserializer()); > > assertFalse(outputTopic.isEmpty()); > > System.out.println("First join result:"); > > outputTopic.readKeyValuesToList() > > .forEach((keyValue)-> > > System.out.println("Key: " + keyValue.key + " Value: > " > > + keyValue.value)); > > > > assertTrue(outputTopic.isEmpty()); > > > > leftTopic.pipeInput("1", "test string 3", 4002L); > > leftTopic.pipeInput("1", "test string 4", 6004L); > > > > System.out.println("Second join result:"); > > outputTopic.readKeyValuesToList() > > .forEach((keyValue)-> > > System.out.println("Key: " + keyValue.key + " Value: > " > > + keyValue.value)); > > > > } > > } > > > > Here is the output: > > First join result: > > Key: 1 Value: test string 1, null > > Second join result: > > > > I would have expected a join to happen with "test string 2" and "test > > string 3" being output with a null right value. Why didn't that happen? > > > > >
Re: outerJoin confusion
Seems like there is some issue with the TopologyTestDriver. I am able to run the same stream against Kakfa and I'm getting the output I expect. I'd appreciate it if someone could confirm that there is an issue with the TopologyTestDriver. If there is, any suggestions on how to test this type of join? On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler wrote: > Hello, > > I'm confused about the outerJoin and when records are produced with the > following code. > > Topology buildTopology() { > var builder = new StreamsBuilder(); > var leftStream = builder.stream("leftSecondsTopic", > Consumed.with(Serdes.String(), Serdes.String())); > var rightStream = builder.stream("rightSecondsTopic", > Consumed.with(Serdes.String(), Serdes.String())); > > leftStream.outerJoin(rightStream, (left, right) -> left + ", " + > right, > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) > .to("outputTopicSeconds"); > > return builder.build(); > } > > Here is the test driver. > > @Test > public void testSecondsJoinDoesNotWork() { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.StringSerde.class); > var app = new KafkaStreamJoinTest(); > var serializer = new StringSerializer(); > > try(var testDriver = new TopologyTestDriver(app.buildTopology(), > props)) { > var leftTopic = > testDriver.createInputTopic("leftSecondsTopic", > serializer, serializer, Instant.ofEpochMilli(0L), > Duration.ZERO); > leftTopic.pipeInput("1", "test string 1", 0L); > leftTopic.pipeInput("1", "test string 2", 2001L); > > var outputTopic = > testDriver.createOutputTopic("outputTopicSeconds", > new StringDeserializer(), new StringDeserializer()); > assertFalse(outputTopic.isEmpty()); > System.out.println("First join result:"); > outputTopic.readKeyValuesToList() > .forEach((keyValue)-> > System.out.println("Key: " + keyValue.key + " Value: " > + keyValue.value)); > > assertTrue(outputTopic.isEmpty()); > > leftTopic.pipeInput("1", "test string 3", 4002L); > leftTopic.pipeInput("1", "test string 4", 6004L); > > System.out.println("Second join result:"); > outputTopic.readKeyValuesToList() > .forEach((keyValue)-> > System.out.println("Key: " + keyValue.key + " Value: " > + keyValue.value)); > > } > } > > Here is the output: > First join result: > Key: 1 Value: test string 1, null > Second join result: > > I would have expected a join to happen with "test string 2" and "test > string 3" being output with a null right value. Why didn't that happen? > >
outerJoin confusion
Hello, I'm confused about the outerJoin and when records are produced with the following code. Topology buildTopology() { var builder = new StreamsBuilder(); var leftStream = builder.stream("leftSecondsTopic", Consumed.with(Serdes.String(), Serdes.String())); var rightStream = builder.stream("rightSecondsTopic", Consumed.with(Serdes.String(), Serdes.String())); leftStream.outerJoin(rightStream, (left, right) -> left + ", " + right, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000))) .to("outputTopicSeconds"); return builder.build(); } Here is the test driver. @Test public void testSecondsJoinDoesNotWork() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); var app = new KafkaStreamJoinTest(); var serializer = new StringSerializer(); try(var testDriver = new TopologyTestDriver(app.buildTopology(), props)) { var leftTopic = testDriver.createInputTopic("leftSecondsTopic", serializer, serializer, Instant.ofEpochMilli(0L), Duration.ZERO); leftTopic.pipeInput("1", "test string 1", 0L); leftTopic.pipeInput("1", "test string 2", 2001L); var outputTopic = testDriver.createOutputTopic("outputTopicSeconds", new StringDeserializer(), new StringDeserializer()); assertFalse(outputTopic.isEmpty()); System.out.println("First join result:"); outputTopic.readKeyValuesToList() .forEach((keyValue)-> System.out.println("Key: " + keyValue.key + " Value: " + keyValue.value)); assertTrue(outputTopic.isEmpty()); leftTopic.pipeInput("1", "test string 3", 4002L); leftTopic.pipeInput("1", "test string 4", 6004L); System.out.println("Second join result:"); outputTopic.readKeyValuesToList() .forEach((keyValue)-> System.out.println("Key: " + keyValue.key + " Value: " + keyValue.value)); } } Here is the output: First join result: Key: 1 Value: test string 1, null Second join result: I would have expected a join to happen with "test string 2" and "test string 3" being output with a null right value. Why didn't that happen?