Thanks for the information.

Best
Yan

From: Xingcan Cui <xingc...@gmail.com>
Date: Wednesday, December 13, 2017 at 6:02 PM
To: "Yan Zhou [FDS Science]" <yz...@coupang.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: how does time-windowed join and Over-Window Aggregation 
implemented in flink SQL?

Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of some 
join APIs but was implemented with the low-level CoProcessFunction (see 
TimeBoundedStreamInnerJoin.scala<http://flink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala>).
 The pipeline is generated in 
DataStreamWindowJoin.scala<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L106>.

Regarding the over-window aggregation, most of the implementations can be found 
in this 
package<https://github.com/apache/flink/tree/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate>.
 The pipeline is generated in 
DataStreamOverAggregate.scala<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala#L99>.

In summary, they use built-in state tools to cache the rows/intermediate 
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] 
<yz...@coupang.com<mailto:yz...@coupang.com>> wrote:

Hi,



I am building a data pipeline with a lot of streaming join and over window 
aggregation. And flink SQL have these feature supported. However, there is no 
similar DataStream APIs provided(maybe there is and I didn't find them. please 
point out if there is). I got confused because I assume that the SQL logical 
plan will be translated into a graph of operators or transformations.



Could someone explain how these two sql query are  implemented or translated 
into low level code ( operators or transformations)? I am asking this because I 
have implemented these features without using SQL and the performance looks 
good. And I certainly love to migrate to SQL, but I want to understand them 
well first. Any information or hints or links are appreciated.



  1.  Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL 
API (time-windowed join) can join two streams within quite different time 
range. Below is an sample query that listed in official doc, and we can see 
that Orders and Shipments have 4 hours difference. Is it implemented by 
CoProcessFunction or TwoInputOperator which buffers the event for a certain 
period?



SELECT *

FROM Orders o, Shipments s

WHERE o.id = s.orderId AND

      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? 
Does it use keyed state to buffer the previous events, and pull the records 
when there is a need? How does sorting get handled?


Best
Yan







Reply via email to