[
https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruno Cadonna updated KAFKA-14862:
--
Description:
If I execute the following Streams app once with two input topics each with 1
partition and then with input topics each with two partitions, I get different
results.
{code:java}
final KStream leftSide = builder.stream(leftSideTopic);
final KStream rightSide = builder.stream(rightSideTopic);
final KStream leftAndRight = leftSide.outerJoin(
rightSide,
(leftValue, rightValue) ->
(rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" +
rightValue,
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofSeconds(20),
Duration.ofSeconds(10)),
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String() /* right value */
));
leftAndRight.print(Printed.toSysOut());
{code}
To reproduce, produce twice the following batch of records with an interval
greater than window + grace period (i.e. > 30 seconds) in between the two
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}
With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}
With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}
I would expect to get the same set of records, maybe in a different order due
to the partitioning.
was:
If I execute the following Streams app once with two input topics each with 1
partition and then with input topics each with two partitions, I get different
results.
{code:java}
final KStream leftSide = builder.stream(leftSideTopic);
final KStream rightSide =
builder.stream(rightSideTopic);
final KStream leftAndRight = leftSide.outerJoin(
rightSide,
(leftValue, rightValue) ->
(rightValue == null) ? leftValue + "/NOTPRESENT": leftValue +
"/" + rightValue,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(20),
Duration.ofSeconds(10)),
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String() /* right value */
)
);
leftAndRight.print(Printed.toSysOut());
{code}
To reproduce, produce twice the following batch of records with an interval
greater than window + grace period (i.e. > 30 seconds) in between the two
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}
With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}
With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}
I would expect to get the same set of records, maybe in a different order due
to the partitioning.
> Outer stream-stream join does not output all results with multiple input
> partitions
> ---
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> If I execute the following Streams app once with two input