[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126895#comment-16126895 ]
Xingcan Cui commented on FLINK-6233: ------------------------------------ Hi [~fhueske], thanks for the response. # I'll do the abstraction. # Did you mean that even for the rowtime join, the clean up timers should also use the {{ctx.registerProcessingTimeTimer()}} instead of {{ctx.registerEventTimeTimer()}}? I noticed that there's another issue (FLINK-7388) about the {{onTimer()}} method, but not sure if it's relative. # I'm glad to hear that the keys for RocksDBMapState are ordered. That'll make things easier. :D # I'll prepare for the allowed latency parameter. About the timer services separation, I think you are right. Only when the event times for the two streams are out-of-sync and the join conditions just meet this "async", can we benefit from it. In proctime join, the states are cleaned up in two method: (1) the {{process()}}, and (2) the {{onTimer()}}. That's fine because the two stream are strictly synchronized and there is no late data. However, for rowtime join, I think the states can only be cleaned by the {{onTimer()}}, which is triggered by the watermarks. Then, if the watermarks of the faster stream are "swallowed" by the operators, the states *_may_* not be cleaned up in time. We can also consider this problem from the view of window representation. In current proctime join implementation, the window sizes for the two streams are calculated by the following codes. {code:java} // compute window sizes, i.e., how long to keep rows in state. // window size of -1 means rows do not need to be put into state. private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound else -1 private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound else -1 {code} They may only work for proctime _*with no late data*_. Otherwise, the window size should never be {{-1}}. Actually, I think the two windows can be transformed to be symmetric with the same size and an offset between them (the last figure in [Inner Join in Flink|https://goo.gl/4AdR7h] gives an illustration). Watermark separation can make the {{offset}} noticed by the function, especially when the value is large. There's another question. I run a test with the following SQL: {code:java} ELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND OrderA.rtA BETWEEN OrderB.rtB + INTERVAL '5' SECOND AND OrderB.rtB - INTERVAL '6' + SECOND {code} The function shows {{leftLowerBound = 5000, leftUpperBound = -6000}} and {{leftStreamWinSize = -1, rightStreamWinSize = -1}}. Is that normal? Maybe we just assume the conditions are in the form of "BETWEEN lower bound AND upper bound"? > Support rowtime inner equi-join between two streams in the SQL API > ------------------------------------------------------------------ > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: hongyuhong > Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)