[jira] [Updated] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

2023-04-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14862:

Affects Version/s: 3.1.0

> Outer stream-stream join does not output all results with multiple input 
> partitions
> ---
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Bruno Cadonna
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> If I execute the following Streams app once with two input topics each with 1 
> partition and then with input topics each with two partitions, I get 
> different results.
>   
> {code:java}
> final KStream leftSide = builder.stream(leftSideTopic);
> final KStream rightSide = builder.stream(rightSideTopic);
> final KStream leftAndRight = leftSide.outerJoin(
> rightSide,
> (leftValue, rightValue) ->
> (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + 
> rightValue,
> JoinWindows.ofTimeDifferenceAndGrace(
> Duration.ofSeconds(20), 
> Duration.ofSeconds(10)),
> StreamJoined.with(
> Serdes.String(), /* key */
> Serdes.String(), /* left value */
> Serdes.String()  /* right value */
> ));
> leftAndRight.print(Printed.toSysOut());
> {code}
> To reproduce, produce twice the following batch of records with an interval 
> greater than window + grace period (i.e. > 30 seconds) in between the two 
> batches:
> {code}
> (0, 0)
> (1, 1)
> (2, 2)
> (3, 3)
> (4, 4)
> (5, 5)
> (6, 6)
> (7, 7)
> (8, 8)
> (9, 9)
> {code}
> With input topics with 1 partition I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> With input topics with 2 partitions I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> I would expect to get the same set of records, maybe in a different order due 
> to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

2023-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14862:
--
Description: 
If I execute the following Streams app once with two input topics each with 1 
partition and then with input topics each with two partitions, I get different 
results.
  
{code:java}
final KStream leftSide = builder.stream(leftSideTopic);
final KStream rightSide = builder.stream(rightSideTopic);

final KStream leftAndRight = leftSide.outerJoin(
rightSide,
(leftValue, rightValue) ->
(rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + 
rightValue,
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofSeconds(20), 
Duration.ofSeconds(10)),
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String()  /* right value */
));
leftAndRight.print(Printed.toSysOut());
{code}

To reproduce, produce twice the following batch of records with an interval 
greater than window + grace period (i.e. > 30 seconds) in between the two 
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}

With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

I would expect to get the same set of records, maybe in a different order due 
to the partitioning.

  was:
If I execute the following Streams app once with two input topics each with 1 
partition and then with input topics each with two partitions, I get different 
results.
  
{code:java}
final KStream leftSide = builder.stream(leftSideTopic);
final KStream rightSide = 
builder.stream(rightSideTopic);

final KStream leftAndRight = leftSide.outerJoin(
rightSide,
(leftValue, rightValue) ->
(rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + 
"/" + rightValue,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(20), 
Duration.ofSeconds(10)),
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String()  /* right value */
)
);
leftAndRight.print(Printed.toSysOut());
{code}

To reproduce, produce twice the following batch of records with an interval 
greater than window + grace period (i.e. > 30 seconds) in between the two 
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}

With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

I would expect to get the same set of records, maybe in a different order due 
to the partitioning.


> Outer stream-stream join does not output all results with multiple input 
> partitions
> ---
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> If I execute the following Streams app once with two input