[ https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-14862: --------------------------------------- Assignee: Matthias J. Sax > Outer stream-stream join does not output all results with multiple input > partitions > ----------------------------------------------------------------------------------- > > Key: KAFKA-14862 > URL: https://issues.apache.org/jira/browse/KAFKA-14862 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Bruno Cadonna > Assignee: Matthias J. Sax > Priority: Major > > If I execute the following Streams app once with two input topics each with 1 > partition and then with input topics each with two partitions, I get > different results. > > {code:java} > final KStream<String, String> leftSide = builder.stream(leftSideTopic); > final KStream<String, String> rightSide = builder.stream(rightSideTopic); > final KStream<String, String> leftAndRight = leftSide.outerJoin( > rightSide, > (leftValue, rightValue) -> > (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + > rightValue, > JoinWindows.ofTimeDifferenceAndGrace( > Duration.ofSeconds(20), > Duration.ofSeconds(10)), > StreamJoined.with( > Serdes.String(), /* key */ > Serdes.String(), /* left value */ > Serdes.String() /* right value */ > )); > leftAndRight.print(Printed.toSysOut()); > {code} > To reproduce, produce twice the following batch of records with an interval > greater than window + grace period (i.e. > 30 seconds) in between the two > batches: > {code} > (0, 0) > (1, 1) > (2, 2) > (3, 3) > (4, 4) > (5, 5) > (6, 6) > (7, 7) > (8, 8) > (9, 9) > {code} > With input topics with 1 partition I get: > {code} > [KSTREAM-PROCESSVALUES-0000000008]: 0, 0/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 2, 2/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 5, 5/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 6, 6/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT > {code} > With input topics with 2 partitions I get: > {code} > [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT > [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT > {code} > I would expect to get the same set of records, maybe in a different order due > to the partitioning. -- This message was sent by Atlassian Jira (v8.20.10#820010)