Yes Fabian,
I will complete my design with more thorough thoughts. BTW, I think the
incremental aggregate (the key point I suggested is to eliminate state per
each window) I proposed should work for both processing time and event
time. It just does not need a sorted state for the processing time
scenarios. (Need to verify).

Regards,
Shaoxuan


On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi everybody,
>
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
>
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
>
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
>
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
>
> # bounded PRECEDING
> - OVER ROWS for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count.
>   - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
>
> - OVER RANGE for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count
>   - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
>
> - OVER RANGE for event time
>   - need for sorted state (late data possible)
>   - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>   - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
>
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
> window types.
>
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
>
> What do think?
>
> Best, Fabian
>
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>:
>
> > Hi Liuxinchun,
> > I am not sure where did you get the inception: anyone has suggested "to
> > process Event time window in Sliding Row Window". If you were referring
> my
> > post, there may be some misunderstanding there. I think you were asking
> the
> > similar question as Hongyuhong. I have just replied to him. Please take a
> > look and let me know if that makes sense to you. "Retraction" is an
> > important building block to compute correct incremental results in
> > streaming. It is another big topic, we should discuss this in another
> > thread.
> >
> > Regards,
> > Shaoxuan
> >
> >
> >
> > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com>
> wrote:
> >
> > > I don't think it is a good idea to process Event time window in Sliding
> > > Row Window. In Sliding Time window, when an element is late, we can
> > trigger
> > > the recalculation of the related windows. And the sliding period is
> > > coarse-gained, We only need to recalculate size/sliding number of
> > windows.
> > > But in Sliding Row Window, the calculation is triggered when every
> > element
> > > is coming. The sliding period is becoming fine-gained. When an element
> is
> > > late, there are so many "windows" are influenced. Even if we store all
> > the
> > > raw data, the computation is very large.
> > >
> > > I think if it is possible to set a standard to sliding Event Time Row
> > > Window, When certain elements are late, we can only recalculate partial
> > > windows and permit some error. For example, we can only recalculate the
> > > windows end in range between (lateElement.timestamp - leftDelta,
> > > lateElement.timestamp] and those windows begin in range between
> > > [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > > ////////////////////////////////////////////////////////////
> > > //////////////////////////
> > >  Hi everyone,
> > > Thanks for this great discussion, and glad to see more and more people
> > are
> > > interested on stream SQL & tableAPI.
> > >
> > > IMO, the key problems for Over window design are the SQL semantics and
> > the
> > > runtime design. I totally agree with Fabian that we should skip the
> > design
> > > of TumbleRows and SessionRows windows for now, as they are not well
> > defined
> > > in SQL semantics.
> > >
> > > Runtime design is the most crucial part we are interested in and
> > > volunteered to contribute into. We have thousands of machines running
> > flink
> > > streaming jobs. The costs in terms of CPU, memory, and state are the
> > vital
> > > factors that we have to taken into account. We have been working on the
> > > design of OVER window in the past months, and planning to send out a
> > > detailed design doc to DEV quite soon. But since Fabian started a good
> > > discussion on OVER window, I would like to share our ideas/thoughts
> about
> > > the runtime design for OVER window.
> > >
> > >    1. As SunJincheng pointed out earlier, sliding window does not work
> > for
> > >    unbounded preceding, we need alternative approach for unbound over
> > > window.
> > >    2. Though sliding window may work for some cases of bounded window,
> > >    it is not very efficient thereby should not be used for production.
> To
> > > the
> > >    best of my understanding, the current runtime implementation of
> > sliding
> > >    window has not leveraged the concepts of state Panes yet. This means
> > > that
> > >    if we use sliding window for OVER window,  there will be a backend
> > state
> > >    created per each group (partition by) and each row, and whenever a
> new
> > >    record arrives, it will be accumulated to all the existing windows
> > that
> > > has
> > >    not been closed. This would cause quite a lot of overhead in terms
> of
> > > both
> > >    CPU and memory&state.
> > >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> > and
> > >    a “sortedState”. I like this idea. The design details on this are
> not
> > > quite
> > >    clear yet. So I would like to add more thoughts on this. Regardless
> > >    which dataStream API we are going to use (it is very likely that we
> > need
> > >    a new API), we should come out with an optimal approach. The purpose
> > of
> > >    grouping window and over window is to partition the data, such that
> we
> > > can
> > >    generate the aggregate results. So when we talk about the design of
> > OVER
> > >    window, we have to think about the aggregates. As we proposed in our
> > > recent
> > >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
> > be
> > >    stored in the aggregate state. Besides accumulator, we have also
> > > introduced
> > >    a retract API for UDAGG. With aggregate accumulator and retract
> API, I
> > > am
> > >    proposing a runtime approach to implement the OVER window as
> > followings.
> > >    4.
> > >       - We first implement a sorted state interface
> > >       - Per each group, we just create one sorted state. When a new
> > record
> > >       arrives, it will insert into this sorted state, in the meanwhile
> it
> > > will be
> > >       accumulated to the aggregate accumulator.
> > >       - For over window, we keep the aggregate accumulator for the
> entire
> > >       job lifelong time. This is different than the case where we
> delete
> > > the
> > >       accumulator for each group/window when a grouping-window is
> > finished.
> > >       - When an over window is up to trigger, we grab the
> > >       previous accumulator from the state and accumulate values onto it
> > > with all
> > >       the records till the upperBoundary of the current window, and
> > > retract all
> > >       the out of scope records till its lowerBoundary. We emit the
> > >       aggregate result and save the accumulator for the next window.
> > >
> > >
> > > Hello Fabian,
> > > I would suggest we should first start working on runtime design of over
> > > window and aggregate. Once we have a good design there, one can easily
> > add
> > > the support for SQL as well as tableAPI. What do you think?
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >
> > > > Hi Radu,
> > > >
> > > > thanks for your comments!
> > > >
> > > > Yes, my intention is to open new JIRA issues to structure the
> > > > development process. Everybody is very welcome to pick up issues and
> > > > discuss the design proposals.
> > > > At the moment I see the following six issues to start with:
> > > >
> > > > - streaming SQL OVER ROW for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for event time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > For each of these windows we need corresponding translation rules and
> > > > execution code.
> > > >
> > > > Subsequent JIRAs would be
> > > > - extending the Table API for supported SQL windows
> > > > - add support for FOLLOWING
> > > > - etc.
> > > >
> > > > Regarding the requirement for a sorted state. I am not sure if the
> > > > OVER windows should be implemented using Flink's DataStream window
> > > framework.
> > > > We need a good design document to figure out what is the best
> > > > approach. A ProcessFunction with a sorted state might be a good
> > solution
> > > as well.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for starting these discussion - it is very useful.
> > > > > It does make sense indeed to refactor all these and coordinate a
> bit
> > > > > the efforts not to have overlapping implementations and
> incompatible
> > > > solutions.
> > > > >
> > > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > > redesign them and open new ones? Do you need help from our side -
> we
> > > > > can also pick the redesign of some of these new jira issues. For
> > > > > example we already
> > > > have
> > > > > an implementation for this and we can help with the design.
> > > > > Nevertheless, let's coordinate the effort.
> > > > >
> > > > > Regarding the support for the different types of window - I think
> > > > > the
> > > > best
> > > > > option is to split the implementation in small units. We can easily
> > > > > do
> > > > this
> > > > > from the transformation rule class and with this each particular
> > > > > type of window (session/sliding/sliderows/processing time/...)
> will
> > > > > have a clear implementation and a corresponding architecture within
> > > the jira issue?
> > > > What
> > > > > do you think about such a granularity?
> > > > >
> > > > > Regarding the issue of " Q4: The implementaion of SlideRows still
> > > > > need a custom operator that collects records in a priority queue
> > > > > ordered by the "rowtime", which is similar to the design we
> > > > > discussed in FLINK-4697, right? "
> > > > > Why would you need this operator? The window buffer can act to some
> > > > extent
> > > > > as a priority queue as long as the trigger and evictor is set to
> > > > > work
> > > > based
> > > > > on the rowtime - or maybe I am missing something... Can you please
> > > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > > Dr. Radu Tudoran
> > > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > > >
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > European Research Center
> > > > > Riesstrasse 25, 80992 München
> > > > >
> > > > > E-mail: radu.tudo...@huawei.com
> > > > > Mobile: +49 15209084330
> > > > > Telephone: +49 891588344173
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> 56063,
> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > > This e-mail and its attachments contain confidential information
> from
> > > > > HUAWEI, which is intended only for the person or entity whose
> address
> > > is
> > > > > listed above. Any use of the information contained herein in any
> way
> > > > > (including, but not limited to, total or partial disclosure,
> > > > reproduction,
> > > > > or dissemination) by persons other than the intended recipient(s)
> is
> > > > > prohibited. If you receive this e-mail in error, please notify the
> > > sender
> > > > > by phone or email immediately and delete it!
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: Jark Wu [mailto:wuchong...@alibaba-inc.com]
> > > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > > To: dev@flink.apache.org
> > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > for
> > > > > streaming tables
> > > > >
> > > > > Hi Fabian,
> > > > >
> > > > > Thanks for bringing up this discussion and the nice approach to
> avoid
> > > > > overlapping contributions.
> > > > >
> > > > > All of these make sense to me. But I have some questions.
> > > > >
> > > > > Q1: If I understand correctly, we will not support TumbleRows and
> > > > > SessionRows at the beginning. But maybe support them as a syntax
> > sugar
> > > > (in
> > > > > Table API) when the SlideRows is supported in the future. Right ?
> > > > >
> > > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> how
> > to
> > > > > partition on "gap-separated".
> > > > >
> > > > > Q3: Should we break down the approach into smaller tasks for
> > streaming
> > > > > tables and batch tables ?
> > > > >
> > > > > Q4: The implementaion of SlideRows still need a custom operator
> that
> > > > > collects records in a priority queue ordered by the "rowtime",
> which
> > is
> > > > > similar to the design we discussed in FLINK-4697, right?
> > > > >
> > > > > +1 not support for OVER ROW for event time at this point.
> > > > >
> > > > > Regards, Jark
> > > > >
> > > > >
> > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道:
> > > > > >
> > > > > > Hi,
> > > > > > We are also interested in streaming sql and very willing to
> > > participate
> > > > > and contribute.
> > > > > >
> > > > > > We are now in progress and we will also contribute to calcite to
> > push
> > > > > forward the window and stream-join support.
> > > > > >
> > > > > >
> > > > > >
> > > > > > --------------
> > > > > > Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time:
> > > 2017年1月24日
> > > > > > 5:55
> > > > > > Receiver: dev@flink.apache.org
> > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > > > for streaming tables
> > > > > >
> > > > > > Hi Haohui,
> > > > > >
> > > > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > > Unfortunately, this issue does not seem to be very active, so I
> > don't
> > > > > know what the progress is.
> > > > > >
> > > > > > I would suggest to move the discussion about group windows to a
> > > > separate
> > > > > thread and keep this one focused on the organization of the SQL
> OVER
> > > > > windows.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > > >
> > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>:
> > > > > >
> > > > > >> Hi Fabian,
> > > > > >>
> > > > > >> FLINK-4692 has added the support for tumbling window and we are
> > > > > >> excited to try it out and expose it as a SQL construct.
> > > > > >>
> > > > > >> Just curious -- what's your thought on the SQL syntax on
> tumbling
> > > > > window?
> > > > > >>
> > > > > >> Implementation wise it might make sense to think tumbling window
> > as
> > > a
> > > > > >> special case of the sliding window.
> > > > > >>
> > > > > >> The problem I see is that the OVER construct might be
> insufficient
> > > to
> > > > > >> support all the use cases of tumbling windows. For example, it
> > fails
> > > > > >> to express tumbling windows that have fractional time units (as
> > > > > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > > > > >>
> > > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
> > this
> > > > > issue.
> > > > > >>
> > > > > >> Do you think it is a good idea to follow the same conventions?
> > Your
> > > > > >> ideas are appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Haohui
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <ricet...@gmail.com>
> > > > wrote:
> > > > > >>
> > > > > >>> +1
> > > > > >>>
> > > > > >>> We are also quite interested in these features and would love
> to
> > > > > >>> participate and contribute.
> > > > > >>>
> > > > > >>> ~Haohui
> > > > > >>>
> > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> fhue...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi everybody,
> > > > > >>>>
> > > > > >>>> it seems that currently several contributors are working on
> new
> > > > > >>>> features for the streaming Table API / SQL around row windows
> > (as
> > > > > >>>> defined in
> > > > > >>>> FLIP-11
> > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > > FLINK-4680,
> > > > > >>>> FLINK-5584).
> > > > > >>>> Since these efforts overlap quite a bit I spent some time
> > thinking
> > > > > >>>> about how we can approach these features and how to avoid
> > > > > >>>> overlapping contributions.
> > > > > >>>>
> > > > > >>>> The challenge here is the following. Some of the Table API row
> > > > > >>>> windows
> > > > > >> as
> > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > other
> > > > > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > > > > >>>> intervals, SessionRows).
> > > > > >>>> However, since Calcite already supports SQL OVER windows, we
> can
> > > > > >>>> reuse
> > > > > >> the
> > > > > >>>> optimization logic for some of the Table API row windows. I
> also
> > > > > >>>> thought about the semantics of the TumbleRows and SessionRows
> > > > > >>>> windows as defined in
> > > > > >>>> FLIP-11 and came to the conclusion that these are not well
> > defined
> > > > > >>>> in
> > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> with a
> > > > > >>>> special PARTITION BY clause.
> > > > > >>>>
> > > > > >>>> I propose to approach SQL OVER windows and Table API row
> windows
> > > as
> > > > > >>>> follows:
> > > > > >>>>
> > > > > >>>> We start with three simple cases for SQL OVER windows (not
> Table
> > > > > >>>> API
> > > > > >> yet):
> > > > > >>>>
> > > > > >>>> * OVER RANGE for event time
> > > > > >>>> * OVER RANGE for processing time
> > > > > >>>> * OVER ROW for processing time
> > > > > >>>>
> > > > > >>>> All cases fulfill the following restrictions:
> > > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> on a
> > > > > >>>> marker function that indicates processing time. Additional
> sort
> > > > > >>>> attributes are not supported initially.
> > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > "BETWEEN
> > > x
> > > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > > >>>>
> > > > > >>>> OVER ROW for event time cannot be easily supported. With event
> > > > > >>>> time, we may have late records which need to be injected into
> > the
> > > > > >>>> order of records.
> > > > > >>>> When
> > > > > >>>> a record in injected in to the order where a row-count window
> > has
> > > > > >> already
> > > > > >>>> been computed, this and all following windows will change. We
> > > could
> > > > > >> either
> > > > > >>>> drop the record or sent out many retraction records. I think
> it
> > is
> > > > > >>>> best
> > > > > >> to
> > > > > >>>> not open this can of worms at this point.
> > > > > >>>>
> > > > > >>>> The rational for all of the above restrictions is to have
> first
> > > > > >>>> versions of OVER windows soon.
> > > > > >>>> Once we have the above cases covered we can extend and remove
> > > > > >> limitations
> > > > > >>>> as follows:
> > > > > >>>>
> > > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > > above).
> > > > > >>>> This will be mostly API work since the execution part has been
> > > > solved
> > > > > before.
> > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > > >>>> - Add support for different windows in SELECT. All windows
> must
> > be
> > > > > >>>> partitioned and ordered in the same way.
> > > > > >>>> - Add support for additional ORDER BY attributes (besides
> time).
> > > > > >>>>
> > > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> > FLIP-11
> > > > > >>>> are
> > > > > >> not
> > > > > >>>> well defined, IMO.
> > > > > >>>> They can be expressed as SlideRows windows with special
> > > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> ranges
> > > > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges
> > for
> > > > > >>>> SessionRows) I would not start to work on those yet.
> > > > > >>>>
> > > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > > development
> > > > > >>>> of these
> > > > > >> features
> > > > > >>>> as outlined above with corresponding JIRA issues.
> > > > > >>>>
> > > > > >>>> What do others think? (I cc'ed the contributors assigned to
> the
> > > > > >>>> above
> > > > > >> JIRA
> > > > > >>>> issues)
> > > > > >>>>
> > > > > >>>> Best, Fabian
> > > > > >>>>
> > > > > >>>> [1]
> > > > > >>>>
> > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to