[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485524#comment-15485524 ]
ASF GitHub Bot commented on KAFKA-4153: --------------------------------------- GitHub user eliaslevy opened a pull request: https://github.com/apache/kafka/pull/1846 KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric time window The contribution is my original work and I license the work to the project under the project's open source license. @guozhangwang You can merge this pull request into a Git repository by running: $ git pull https://github.com/eliaslevy/kafka KAFKA-4153 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1846 ---- commit a82897f5e4933698c95de1a77ae6ae6f4c721743 Author: Elias Levy <fearsome.lucid...@gmail.com> Date: 2016-09-12T22:27:34Z Swap before & after values for other KStreamKstreamJoin commit 05721ca926321e298fbd3481c96db176dfec9716 Author: Elias Levy <fearsome.lucid...@gmail.com> Date: 2016-09-12T22:28:37Z Add tests for asymetric window stream-stream joins ---- > Incorrect KStream-KStream join behavior with asymmetric time window > ------------------------------------------------------------------- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.1 > Reporter: Elias Levy > Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(10000))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(10000))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(10000))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)