[
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491329#comment-15491329
]
Guozhang Wang commented on KAFKA-4153:
--------------------------------------
According to the stated Java doc string, {{JoinWindows}} definition should be
reversed as it states that, for example in {{before}} function, {{if the
timestamp of a record from the secondary stream is earlier than or equal to the
timestamp of a record from the first stream}}. It is indeed a bug that should
be fixed, we can review the PR and merge it.
As for defining asymmetric {{JoinWindows}}, I think it is better to add to the
current {{before}} {{after}} functions with static constructors, as it is for
defining the "length" of the window as final values that should not be
overridden later.
> Incorrect KStream-KStream join behavior with asymmetric time window
> -------------------------------------------------------------------
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Elias Levy
> Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some
> time, but only when records from one stream are newer than records from the
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(10000))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(10000))}}
> Alas, that this is not the case. Instead, this generates the same output as
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(10000))}}
> The problem is that the
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
> implementation in {{KStreamImpl}} fails to reverse the {{before}} and
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)