Hi Yan Zhou, as you may have noticed, the SQL level stream join was not built on top of some join APIs but was implemented with the low-level CoProcessFunction (see TimeBoundedStreamInnerJoin.scala <http://flink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala>). The pipeline is generated in DataStreamWindowJoin.scala <https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L106> .
Regarding the over-window aggregation, most of the implementations can be found in this package <https://github.com/apache/flink/tree/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate>. The pipeline is generated in DataStreamOverAggregate.scala <https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala#L99> . In summary, they use built-in state tools to cache the rows/intermediate results and clean/fire them when necessary. Hope that helps. Best, Xingcan On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <yz...@coupang.com> wrote: > Hi, > > > I am building a data pipeline with a lot of streaming join and > over window aggregation. And flink SQL have these feature supported. However, > there is no similar DataStream APIs provided(maybe there is and I didn't > find them. please point out if there is). I got confused because I assume > that the SQL logical plan will be translated into a graph of operators > or transformations. > > > Could someone explain how these two sql query are implemented or > translated into low level code ( operators or transformations)? I am asking > this because I have implemented these features without using SQL and the > performance looks good. And I certainly love to migrate to SQL, but I want > to understand them well first. Any information or hints or links are > appreciated. > > > > 1. Time-Windowed Join > > The DataStream API only provides streaming join within same window. But > the SQL API (time-windowed join) can join two streams within quite > different time range. Below is an sample query that listed in official > doc, and we can see that *Orders* and *Shipments *have 4 hours > difference. Is it implemented by CoProcessFunction or TwoInputOperator > which buffers the event for a certain period? > > > SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND > o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime > > > 2. Over-Window Aggregation > There is no similar feature in DataStream API. How does this get > implemented? Does it use keyed state to buffer the previous events, and > pull the records when there is a need? How does sorting get handled? > > > Best > Yan > > > > >