[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267990#comment-16267990 ]
Hequn Cheng commented on FLINK-8158: ------------------------------------ [~fhueske] [~xccui] hi, Moreover I am confused about the data caching logic. I think window join should cache records with rowtime bigger than watermark. These data may be joined later. !screenshot-1xxx.png! > Rowtime window inner join emits late data > ----------------------------------------- > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Attachments: screenshot-1xxx.png > > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)