[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094649#comment-16094649
 ] 

Fabian Hueske commented on FLINK-6233:
--------------------------------------

Hi, 

let me answer your questions:

# That is correct. Imaging we are joining two tables with a 1 hour window. We 
have two rows, left with timestamp 13:15 and right with timestamp 14:00. At the 
time when we join both rows, the watermark time will be around 14:00 but we 
might want to use the timestamp of the left input for a windowed aggregation. 
Since the watermark time is already past 13:15, the left timestamp would be 
late.
# By holding back I mean that the operator has to keep track of the lowest 
possible timestamp that might be emitted by the operator in the future. It 
might not emit watermarks which are later than this low bound timestamp. This 
requires some coordination effort.
# During the translation of the plan, we would automatically inject an operator 
that sets a timestamp field from the Row into the StreamRecord. This happens 
transparently and the user would not be aware of that.
# You are right. We can only use timestamp fields that have been valid 
timestamps before (valid timestamp means that they were aligned with the 
watermarks). By holding back the watermarks for all valid timestamps, we 
guarantee that the watermarks are aligned with all previously valid timestamps 
and that each of them can be later used. This might induce unnecessary latency 
if a timestamp field is earlier than others but never used. We should be able 
to detect these cases and adjust watermarks appropriately.

Let me know if you have further questions,
Fabian

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to