[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126895#comment-16126895
 ] 

Xingcan Cui edited comment on FLINK-6233 at 8/15/17 10:48 AM:
--------------------------------------------------------------

Hi [~fhueske], thanks for the response.

# I'll do the abstraction.
# Did you mean that even for the rowtime join, the clean up timers should also 
use the {{ctx.registerProcessingTimeTimer()}} instead of 
{{ctx.registerEventTimeTimer()}}? I noticed that there's another issue 
(FLINK-7388) about the {{onTimer()}} method, but not sure if it's relative.
# I'm glad to hear that the keys for RocksDBMapState are ordered. That'll make 
things easier. :D
# I'll prepare for the allowed latency parameter.

About the timer services separation, I think you are right. Only when the event 
times for the two streams are out-of-sync and the join conditions just meet 
this "async", can we benefit from it. In proctime join, the states are cleaned 
up in two method: (1) the {{process()}}, and (2) the {{onTimer()}}. That's fine 
because the two stream are strictly synchronized and there is no late data. 
However, for rowtime join, I think the states can only be cleaned by the 
{{onTimer()}}, which is triggered by the watermarks. Then, if the watermarks of 
the faster stream are "swallowed" by the operators, the states *_may_* not be 
cleaned up in time.

We can also consider this problem from the view of window representation. In 
current proctime join implementation, the window sizes for the two streams are 
calculated by the following codes.
{code:java}
// compute window sizes, i.e., how long to keep rows in state.
// window size of -1 means rows do not need to be put into state.
private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound 
else -1
private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound 
else -1
{code}
They may only work for proctime _*with no late data*_. Otherwise, the window 
size should never be {{-1}}.  Actually, I think the two windows can be 
transformed to be symmetric with the same size and an offset between them (the 
last figure in [Inner Join in Flink|https://goo.gl/4AdR7h] gives an 
illustration). Watermark separation can make the {{offset}} noticed by the 
function, especially when the value is large.


was (Author: xccui):
Hi [~fhueske], thanks for the response.

# I'll do the abstraction.
# Did you mean that even for the rowtime join, the clean up timers should also 
use the {{ctx.registerProcessingTimeTimer()}} instead of 
{{ctx.registerEventTimeTimer()}}? I noticed that there's another issue 
(FLINK-7388) about the {{onTimer()}} method, but not sure if it's relative.
# I'm glad to hear that the keys for RocksDBMapState are ordered. That'll make 
things easier. :D
# I'll prepare for the allowed latency parameter.

About the timer services separation, I think you are right. Only when the event 
times for the two streams are out-of-sync and the join conditions just meet 
this "async", can we benefit from it. In proctime join, the states are cleaned 
up in two method: (1) the {{process()}}, and (2) the {{onTimer()}}. That's fine 
because the two stream are strictly synchronized and there is no late data. 
However, for rowtime join, I think the states can only be cleaned by the 
{{onTimer()}}, which is triggered by the watermarks. Then, if the watermarks of 
the faster stream are "swallowed" by the operators, the states *_may_* not be 
cleaned up in time.

We can also consider this problem from the view of window representation. In 
current proctime join implementation, the window sizes for the two streams are 
calculated by the following codes.
{code:java}
// compute window sizes, i.e., how long to keep rows in state.
// window size of -1 means rows do not need to be put into state.
private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound 
else -1
private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound 
else -1
{code}
They may only work for proctime _*with no late data*_. Otherwise, the window 
size should never be {{-1}}.  Actually, I think the two windows can be 
transformed to be symmetric with the same size and an offset between them (the 
last figure in [Inner Join in Flink|https://goo.gl/4AdR7h] gives an 
illustration). Watermark separation can make the {{offset}} noticed by the 
function, especially when the value is large.

There's another question. I run a test with the following SQL:
{code:java}
ELECT * FROM OrderA, OrderB
WHERE OrderA.productA = OrderB.productB 
AND OrderA.rtA BETWEEN OrderB.rtB + INTERVAL '5' SECOND AND OrderB.rtB - 
INTERVAL '6'  + SECOND
{code}
The function shows  {{leftLowerBound = 5000, leftUpperBound = -6000}} and 
{{leftStreamWinSize = -1, rightStreamWinSize = -1}}. Is that normal? Maybe we 
just assume the conditions are in the form of "BETWEEN lower bound AND upper 
bound"?

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to