Re: outerJoin confusion

2024-04-05 Thread Chad Preisler
I was able to get my test to complete correctly setting the internal
setting and removing the calls to set the wall clock.

props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
0L);

Thanks Shashwat and Matthias for giving me two solutions.


On Thu, Apr 4, 2024 at 12:23 PM Matthias J. Sax  wrote:

> Yeah, that is some quirk of KS runtime...
>
> There is some internal config (for perf reasons) that delays emitting
> results... An alternative to advancing wall-clock time would be to set
> this internal config to zero, to disable the delay.
>
> Maybe we should disable this config when topology test driver is used
> automatically... It's not the first time it did came up.
>
> I opened a PR for it: https://github.com/apache/kafka/pull/15660
>
>
> -Matthias
>
>
>
> On 4/3/24 3:52 PM, Chad Preisler wrote:
> > Changing the code to this...
> >
> > assertTrue(outputTopic.isEmpty());
> >  testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >  leftTopic.pipeInput("1", "test string 3", 4002L);
> >  testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >  leftTopic.pipeInput("1", "test string 4", 6004L);
> >
> > Did appear to fix the issue. Output:
> >
> > First join result:
> > Key: 1 Value: test string 1, null
> > Second join result:
> > Key: 1 Value: test string 2, null
> > Key: 1 Value: test string 3, null
> >
> > Still a little strange that it works the first time without advancing the
> > wall clock.
> >
> > On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
> > shashwat.pandey@gmail.com> wrote:
> >
> >> I believe you need to advanceWallClockTime
> >>
> >>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
> >>
> >>
> >> Regards,
> >> Shashwat Pandey
> >>
> >>
> >> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
> >> wrote:
> >>
> >>> Seems like there is some issue with the TopologyTestDriver. I am able
> to
> >>> run the same stream against Kakfa and I'm getting the output I expect.
> >> I'd
> >>> appreciate it if someone could confirm that there is an issue with the
> >>> TopologyTestDriver. If there is, any suggestions on how to test this
> type
> >>> of join?
> >>>
> >>> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
> >>> wrote:
> >>>
>  Hello,
> 
>  I'm confused about the outerJoin and when records are produced with
> the
>  following code.
> 
>  Topology buildTopology() {
>   var builder = new StreamsBuilder();
>   var leftStream = builder.stream("leftSecondsTopic",
>  Consumed.with(Serdes.String(), Serdes.String()));
>   var rightStream = builder.stream("rightSecondsTopic",
>  Consumed.with(Serdes.String(), Serdes.String()));
> 
>   leftStream.outerJoin(rightStream, (left, right) -> left + ",
> "
> >> +
>  right,
> 
>  JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
>   .to("outputTopicSeconds");
> 
>   return builder.build();
>   }
> 
>  Here is the test driver.
> 
>  @Test
>   public void testSecondsJoinDoesNotWork() {
>   Properties props = new Properties();
>   props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "testSeconds");
>   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>  Serdes.StringSerde.class);
>   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>  Serdes.StringSerde.class);
>   var app = new KafkaStreamJoinTest();
>   var serializer = new StringSerializer();
> 
>   try(var testDriver = new
> >> TopologyTestDriver(app.buildTopology(),
>   props)) {
>   var leftTopic =
>  testDriver.createInputTopic("leftSecondsTopic",
>   serializer, serializer, Instant.ofEpochMilli(0L),
>  Duration.ZERO);
>   leftTopic.pipeInput("1", "test string 1", 0L);
>   leftTopic.pipeInput("1", "test string 2", 2001L);
> 
>   var outputTopic =
>  testDriver.createOutputTopic("outputTopicSeconds",
>   new StringDeserializer(), new
> >> StringDeserializer());
>   assertFalse(outputTopic.isEmpty());
>   System.out.println("First join result:");
>   outputTopic.readKeyValuesToList()
>   .forEach((keyValue)->
>   System.out.println("Key: " + keyValue.key + "
> >> Value:
> >>> "
>  + keyValue.value));
> 
>   assertTrue(outputTopic.isEmpty());
> 
>   leftTopic.pipeInput("1", "test string 3", 4002L);
>   leftTopic.pipeInput("1", "test string 4", 6004L);
> 
>   System.out.println("Second join result:");
>   outputTopic.readKeyValuesToList()
> 

Re: outerJoin confusion

2024-04-04 Thread Matthias J. Sax

Yeah, that is some quirk of KS runtime...

There is some internal config (for perf reasons) that delays emitting 
results... An alternative to advancing wall-clock time would be to set 
this internal config to zero, to disable the delay.


Maybe we should disable this config when topology test driver is used 
automatically... It's not the first time it did came up.


I opened a PR for it: https://github.com/apache/kafka/pull/15660


-Matthias



On 4/3/24 3:52 PM, Chad Preisler wrote:

Changing the code to this...

assertTrue(outputTopic.isEmpty());
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 3", 4002L);
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey@gmail.com> wrote:


I believe you need to advanceWallClockTime

https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
wrote:


Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect.

I'd

appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
wrote:


Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
 var builder = new StreamsBuilder();
 var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
 var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

 leftStream.outerJoin(rightStream, (left, right) -> left + ", "

+

right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
 .to("outputTopicSeconds");

 return builder.build();
 }

Here is the test driver.

@Test
 public void testSecondsJoinDoesNotWork() {
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 var app = new KafkaStreamJoinTest();
 var serializer = new StringSerializer();

 try(var testDriver = new

TopologyTestDriver(app.buildTopology(),

 props)) {
 var leftTopic =
testDriver.createInputTopic("leftSecondsTopic",
 serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
 leftTopic.pipeInput("1", "test string 1", 0L);
 leftTopic.pipeInput("1", "test string 2", 2001L);

 var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
 new StringDeserializer(), new

StringDeserializer());

 assertFalse(outputTopic.isEmpty());
 System.out.println("First join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 assertTrue(outputTopic.isEmpty());

 leftTopic.pipeInput("1", "test string 3", 4002L);
 leftTopic.pipeInput("1", "test string 4", 6004L);

 System.out.println("Second join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 }
 }

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?










Re: outerJoin confusion

2024-04-03 Thread Chad Preisler
Changing the code to this...

assertTrue(outputTopic.isEmpty());
testDriver.advanceWallClockTime(Duration.ofMillis(2001));
leftTopic.pipeInput("1", "test string 3", 4002L);
testDriver.advanceWallClockTime(Duration.ofMillis(2001));
leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey@gmail.com> wrote:

> I believe you need to advanceWallClockTime
>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
>
>
> Regards,
> Shashwat Pandey
>
>
> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
> wrote:
>
> > Seems like there is some issue with the TopologyTestDriver. I am able to
> > run the same stream against Kakfa and I'm getting the output I expect.
> I'd
> > appreciate it if someone could confirm that there is an issue with the
> > TopologyTestDriver. If there is, any suggestions on how to test this type
> > of join?
> >
> > On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
> > wrote:
> >
> > > Hello,
> > >
> > > I'm confused about the outerJoin and when records are produced with the
> > > following code.
> > >
> > > Topology buildTopology() {
> > > var builder = new StreamsBuilder();
> > > var leftStream = builder.stream("leftSecondsTopic",
> > > Consumed.with(Serdes.String(), Serdes.String()));
> > > var rightStream = builder.stream("rightSecondsTopic",
> > > Consumed.with(Serdes.String(), Serdes.String()));
> > >
> > > leftStream.outerJoin(rightStream, (left, right) -> left + ", "
> +
> > > right,
> > >
> > > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> > > .to("outputTopicSeconds");
> > >
> > > return builder.build();
> > > }
> > >
> > > Here is the test driver.
> > >
> > > @Test
> > > public void testSecondsJoinDoesNotWork() {
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
> > > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > > Serdes.StringSerde.class);
> > > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.StringSerde.class);
> > > var app = new KafkaStreamJoinTest();
> > > var serializer = new StringSerializer();
> > >
> > > try(var testDriver = new
> TopologyTestDriver(app.buildTopology(),
> > > props)) {
> > > var leftTopic =
> > > testDriver.createInputTopic("leftSecondsTopic",
> > > serializer, serializer, Instant.ofEpochMilli(0L),
> > > Duration.ZERO);
> > > leftTopic.pipeInput("1", "test string 1", 0L);
> > > leftTopic.pipeInput("1", "test string 2", 2001L);
> > >
> > > var outputTopic =
> > > testDriver.createOutputTopic("outputTopicSeconds",
> > > new StringDeserializer(), new
> StringDeserializer());
> > > assertFalse(outputTopic.isEmpty());
> > > System.out.println("First join result:");
> > > outputTopic.readKeyValuesToList()
> > > .forEach((keyValue)->
> > > System.out.println("Key: " + keyValue.key + "
> Value:
> > "
> > > + keyValue.value));
> > >
> > > assertTrue(outputTopic.isEmpty());
> > >
> > > leftTopic.pipeInput("1", "test string 3", 4002L);
> > > leftTopic.pipeInput("1", "test string 4", 6004L);
> > >
> > > System.out.println("Second join result:");
> > > outputTopic.readKeyValuesToList()
> > > .forEach((keyValue)->
> > > System.out.println("Key: " + keyValue.key + "
> Value:
> > "
> > > + keyValue.value));
> > >
> > > }
> > > }
> > >
> > > Here is the output:
> > > First join result:
> > > Key: 1 Value: test string 1, null
> > > Second join result:
> > >
> > > I would have expected a join to happen with "test string 2" and "test
> > > string 3" being output with a null right value. Why didn't that happen?
> > >
> > >
> >
>


Re: outerJoin confusion

2024-04-03 Thread Shashwat Pandey
I believe you need to advanceWallClockTime
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
wrote:

> Seems like there is some issue with the TopologyTestDriver. I am able to
> run the same stream against Kakfa and I'm getting the output I expect. I'd
> appreciate it if someone could confirm that there is an issue with the
> TopologyTestDriver. If there is, any suggestions on how to test this type
> of join?
>
> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
> wrote:
>
> > Hello,
> >
> > I'm confused about the outerJoin and when records are produced with the
> > following code.
> >
> > Topology buildTopology() {
> > var builder = new StreamsBuilder();
> > var leftStream = builder.stream("leftSecondsTopic",
> > Consumed.with(Serdes.String(), Serdes.String()));
> > var rightStream = builder.stream("rightSecondsTopic",
> > Consumed.with(Serdes.String(), Serdes.String()));
> >
> > leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
> > right,
> >
> > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> > .to("outputTopicSeconds");
> >
> > return builder.build();
> > }
> >
> > Here is the test driver.
> >
> > @Test
> > public void testSecondsJoinDoesNotWork() {
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
> > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.StringSerde.class);
> > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> > Serdes.StringSerde.class);
> > var app = new KafkaStreamJoinTest();
> > var serializer = new StringSerializer();
> >
> > try(var testDriver = new TopologyTestDriver(app.buildTopology(),
> > props)) {
> > var leftTopic =
> > testDriver.createInputTopic("leftSecondsTopic",
> > serializer, serializer, Instant.ofEpochMilli(0L),
> > Duration.ZERO);
> > leftTopic.pipeInput("1", "test string 1", 0L);
> > leftTopic.pipeInput("1", "test string 2", 2001L);
> >
> > var outputTopic =
> > testDriver.createOutputTopic("outputTopicSeconds",
> > new StringDeserializer(), new StringDeserializer());
> > assertFalse(outputTopic.isEmpty());
> > System.out.println("First join result:");
> > outputTopic.readKeyValuesToList()
> > .forEach((keyValue)->
> > System.out.println("Key: " + keyValue.key + " Value:
> "
> > + keyValue.value));
> >
> > assertTrue(outputTopic.isEmpty());
> >
> > leftTopic.pipeInput("1", "test string 3", 4002L);
> > leftTopic.pipeInput("1", "test string 4", 6004L);
> >
> > System.out.println("Second join result:");
> > outputTopic.readKeyValuesToList()
> > .forEach((keyValue)->
> > System.out.println("Key: " + keyValue.key + " Value:
> "
> > + keyValue.value));
> >
> > }
> > }
> >
> > Here is the output:
> > First join result:
> > Key: 1 Value: test string 1, null
> > Second join result:
> >
> > I would have expected a join to happen with "test string 2" and "test
> > string 3" being output with a null right value. Why didn't that happen?
> >
> >
>


Re: outerJoin confusion

2024-04-03 Thread Chad Preisler
Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect. I'd
appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
wrote:

> Hello,
>
> I'm confused about the outerJoin and when records are produced with the
> following code.
>
> Topology buildTopology() {
> var builder = new StreamsBuilder();
> var leftStream = builder.stream("leftSecondsTopic",
> Consumed.with(Serdes.String(), Serdes.String()));
> var rightStream = builder.stream("rightSecondsTopic",
> Consumed.with(Serdes.String(), Serdes.String()));
>
> leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
> right,
>
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
> .to("outputTopicSeconds");
>
> return builder.build();
> }
>
> Here is the test driver.
>
> @Test
> public void testSecondsJoinDoesNotWork() {
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
> var app = new KafkaStreamJoinTest();
> var serializer = new StringSerializer();
>
> try(var testDriver = new TopologyTestDriver(app.buildTopology(),
> props)) {
> var leftTopic =
> testDriver.createInputTopic("leftSecondsTopic",
> serializer, serializer, Instant.ofEpochMilli(0L),
> Duration.ZERO);
> leftTopic.pipeInput("1", "test string 1", 0L);
> leftTopic.pipeInput("1", "test string 2", 2001L);
>
> var outputTopic =
> testDriver.createOutputTopic("outputTopicSeconds",
> new StringDeserializer(), new StringDeserializer());
> assertFalse(outputTopic.isEmpty());
> System.out.println("First join result:");
> outputTopic.readKeyValuesToList()
> .forEach((keyValue)->
> System.out.println("Key: " + keyValue.key + " Value: "
> + keyValue.value));
>
> assertTrue(outputTopic.isEmpty());
>
> leftTopic.pipeInput("1", "test string 3", 4002L);
> leftTopic.pipeInput("1", "test string 4", 6004L);
>
> System.out.println("Second join result:");
> outputTopic.readKeyValuesToList()
> .forEach((keyValue)->
> System.out.println("Key: " + keyValue.key + " Value: "
> + keyValue.value));
>
> }
> }
>
> Here is the output:
> First join result:
> Key: 1 Value: test string 1, null
> Second join result:
>
> I would have expected a join to happen with "test string 2" and "test
> string 3" being output with a null right value. Why didn't that happen?
>
>


outerJoin confusion

2024-04-03 Thread Chad Preisler
Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
var builder = new StreamsBuilder();
var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
.to("outputTopicSeconds");

return builder.build();
}

Here is the test driver.

@Test
public void testSecondsJoinDoesNotWork() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
var app = new KafkaStreamJoinTest();
var serializer = new StringSerializer();

try(var testDriver = new TopologyTestDriver(app.buildTopology(),
props)) {
var leftTopic = testDriver.createInputTopic("leftSecondsTopic",
serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
leftTopic.pipeInput("1", "test string 1", 0L);
leftTopic.pipeInput("1", "test string 2", 2001L);

var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
new StringDeserializer(), new StringDeserializer());
assertFalse(outputTopic.isEmpty());
System.out.println("First join result:");
outputTopic.readKeyValuesToList()
.forEach((keyValue)->
System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));

assertTrue(outputTopic.isEmpty());

leftTopic.pipeInput("1", "test string 3", 4002L);
leftTopic.pipeInput("1", "test string 4", 6004L);

System.out.println("Second join result:");
outputTopic.readKeyValuesToList()
.forEach((keyValue)->
System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));

}
}

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?