[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16093662#comment-16093662
 ] 

Fabian Hueske commented on FLINK-6233:
--------------------------------------

I fully agree! The timestamp / watermark issue requires definitely more 
thoughts. 

Right now, a Table (or intermediate result) can only have a single time 
indicator attribute. For processing time, this attribute is purely virtual and 
its value is injected at runtime based on the wallclock time of the operator. 
For event-time the timestamp is stored in the timestamp field of the 
{{StreamRecord}}. The DataStream API accesses this timestamp for its time-based 
operators (time windows, ProcessFunction, etc.). In the DataStream API, the 
timestamp is hidden from the user and can only be set with a 
TimestampExtractor. However, in SQL all fields must be explicitly declared and 
accessible by queries because query are not fully specified otherwise. The fact 
that the DataStream API treats the timestamp as a special field requires a few 
workarounds in the Table API / SQL.

Today I discussed with [~twalthr] the following idea:

- We allow multiple rowtime indicator attributes and do no longer store them in 
the timestamp field of a {{StreamRecord}}. Instead we store all timestamps as 
regular fields in the Row.
- When a DataStream operator requires a timestamp in the {{StreamRecord}} (we 
use the group-window operators of the DataStream API), we can copy the 
requested timestamp from the Row into the {{StreamRecord}} with a custom 
operator. 
- The join holds the watermark back based on all time attributes in the join 
result, i.e., no record is considered late based on any of its timestamps. This 
might be optimized holding back the watermark only for those timestamps that 
are later used (i.e, not projected out).

At the moment this is just a rough idea and we need to think about this a bit 
more, but right now it looks quite promising IMO.

Best, Fabian



> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to