Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of
some join APIs but was implemented with the low-level CoProcessFunction
(see TimeBoundedStreamInnerJoin.scala
<http://flink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala>).
The pipeline is generated in DataStreamWindowJoin.scala
<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala#L106>
.

Regarding the over-window aggregation, most of the implementations can be
found in this package
<https://github.com/apache/flink/tree/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate>.
The pipeline is generated in DataStreamOverAggregate.scala
<https://github.com/apache/flink/blob/14bc62740e90ecefd34f9202f4a37c883c3122e5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala#L99>
.

In summary, they use built-in state tools to cache the rows/intermediate
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
wrote:

> Hi,
>
>
> I am building a data pipeline with a lot of streaming join and
> over window aggregation. And flink SQL have these feature supported. However,
> there is no similar DataStream APIs provided(maybe there is and I didn't
> find them. please point out if there is). I got confused because I assume
> that the SQL logical plan will be translated into a graph of operators
> or transformations.
>
>
> Could someone explain how these two sql query are  implemented or
> translated into low level code ( operators or transformations)? I am asking
> this because I have implemented these features without using SQL and the
> performance looks good. And I certainly love to migrate to SQL, but I want
> to understand them well first. Any information or hints or links are
> appreciated.
>
>
>
>    1. Time-Windowed Join
>
> The DataStream API only provides streaming join within same window. But
> the SQL API (time-windowed join) can join two streams within quite
> different time range. Below is an sample query that listed in official
> doc, and we can see that *Orders* and *Shipments *have 4 hours
> difference. Is it implemented by CoProcessFunction or TwoInputOperator
> which buffers the event for a certain period?
>
>
> SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND
>       o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>
>
> 2. Over-Window Aggregation
> There is no similar feature in DataStream API. How does this get
> implemented? Does it use keyed state to buffer the previous events, and
> pull the records when there is a need? How does sorting get handled?
>
>
> Best
> Yan
>
>
>
>
>

Reply via email to