Hi Jinkui Shi, Yuhong Hong, Sunjincheng,

I'd suggest to discuss this on the actual JIRA issue.
I think it would help to describe the design and status of the
implementation.

Thanks, Fabian

2017-02-06 3:24 GMT+01:00 shijinkui <shijin...@huawei.com>:

> hi,Fabian, sunjincheng
>
> Today is the first workday of 2017 in China. When we come back, I found
> the SQL issues had been assigned between New Year...
> Yuhong Hong is interest in FLINK-5657. She had implemented it before. Can
> we reconsider to assign FLINK-5657 to her?
>
> Thanks
> Jinkui Shi
>
> [1] https://issues.apache.org/jira/browse/FLINK-4557
>
> -----邮件原件-----
> 发件人: Fabian Hueske [mailto:fhue...@gmail.com]
> 发送时间: 2017年1月25日 17:55
> 收件人: dev@flink.apache.org
> 主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
>
> Hi everybody,
>
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
>
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
>
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
>
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
>
> # bounded PRECEDING
> - OVER ROWS for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count.
>   - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
>
> - OVER RANGE for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count
>   - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
>
> - OVER RANGE for event time
>   - need for sorted state (late data possible)
>   - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>   - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
>
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the
> above window types.
>
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
>
> What do think?
>
> Best, Fabian
>
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>:
>
> > Hi Liuxinchun,
> > I am not sure where did you get the inception: anyone has suggested
> > "to process Event time window in Sliding Row Window". If you were
> > referring my post, there may be some misunderstanding there. I think
> > you were asking the similar question as Hongyuhong. I have just
> > replied to him. Please take a look and let me know if that makes sense
> > to you. "Retraction" is an important building block to compute correct
> > incremental results in streaming. It is another big topic, we should
> > discuss this in another thread.
> >
> > Regards,
> > Shaoxuan
> >
> >
> >
> > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com>
> wrote:
> >
> > > I don't think it is a good idea to process Event time window in
> > > Sliding Row Window. In Sliding Time window, when an element is late,
> > > we can
> > trigger
> > > the recalculation of the related windows. And the sliding period is
> > > coarse-gained, We only need to recalculate size/sliding number of
> > windows.
> > > But in Sliding Row Window, the calculation is triggered when every
> > element
> > > is coming. The sliding period is becoming fine-gained. When an
> > > element is late, there are so many "windows" are influenced. Even if
> > > we store all
> > the
> > > raw data, the computation is very large.
> > >
> > > I think if it is possible to set a standard to sliding Event Time
> > > Row Window, When certain elements are late, we can only recalculate
> > > partial windows and permit some error. For example, we can only
> > > recalculate the windows end in range between (lateElement.timestamp
> > > - leftDelta, lateElement.timestamp] and those windows begin in range
> > > between [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > > ////////////////////////////////////////////////////////////
> > > //////////////////////////
> > >  Hi everyone,
> > > Thanks for this great discussion, and glad to see more and more
> > > people
> > are
> > > interested on stream SQL & tableAPI.
> > >
> > > IMO, the key problems for Over window design are the SQL semantics
> > > and
> > the
> > > runtime design. I totally agree with Fabian that we should skip the
> > design
> > > of TumbleRows and SessionRows windows for now, as they are not well
> > defined
> > > in SQL semantics.
> > >
> > > Runtime design is the most crucial part we are interested in and
> > > volunteered to contribute into. We have thousands of machines
> > > running
> > flink
> > > streaming jobs. The costs in terms of CPU, memory, and state are the
> > vital
> > > factors that we have to taken into account. We have been working on
> > > the design of OVER window in the past months, and planning to send
> > > out a detailed design doc to DEV quite soon. But since Fabian
> > > started a good discussion on OVER window, I would like to share our
> > > ideas/thoughts about the runtime design for OVER window.
> > >
> > >    1. As SunJincheng pointed out earlier, sliding window does not
> > > work
> > for
> > >    unbounded preceding, we need alternative approach for unbound
> > > over window.
> > >    2. Though sliding window may work for some cases of bounded window,
> > >    it is not very efficient thereby should not be used for
> > > production. To the
> > >    best of my understanding, the current runtime implementation of
> > sliding
> > >    window has not leveraged the concepts of state Panes yet. This
> > > means that
> > >    if we use sliding window for OVER window,  there will be a
> > > backend
> > state
> > >    created per each group (partition by) and each row, and whenever a
> new
> > >    record arrives, it will be accumulated to all the existing
> > > windows
> > that
> > > has
> > >    not been closed. This would cause quite a lot of overhead in
> > > terms of both
> > >    CPU and memory&state.
> > >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> > and
> > >    a “sortedState”. I like this idea. The design details on this are
> > > not quite
> > >    clear yet. So I would like to add more thoughts on this. Regardless
> > >    which dataStream API we are going to use (it is very likely that
> > > we
> > need
> > >    a new API), we should come out with an optimal approach. The
> > > purpose
> > of
> > >    grouping window and over window is to partition the data, such
> > > that we can
> > >    generate the aggregate results. So when we talk about the design
> > > of
> > OVER
> > >    window, we have to think about the aggregates. As we proposed in
> > > our recent
> > >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> > > will
> > be
> > >    stored in the aggregate state. Besides accumulator, we have also
> > > introduced
> > >    a retract API for UDAGG. With aggregate accumulator and retract
> > > API, I am
> > >    proposing a runtime approach to implement the OVER window as
> > followings.
> > >    4.
> > >       - We first implement a sorted state interface
> > >       - Per each group, we just create one sorted state. When a new
> > record
> > >       arrives, it will insert into this sorted state, in the
> > > meanwhile it will be
> > >       accumulated to the aggregate accumulator.
> > >       - For over window, we keep the aggregate accumulator for the
> entire
> > >       job lifelong time. This is different than the case where we
> > > delete the
> > >       accumulator for each group/window when a grouping-window is
> > finished.
> > >       - When an over window is up to trigger, we grab the
> > >       previous accumulator from the state and accumulate values onto
> > > it with all
> > >       the records till the upperBoundary of the current window, and
> > > retract all
> > >       the out of scope records till its lowerBoundary. We emit the
> > >       aggregate result and save the accumulator for the next window.
> > >
> > >
> > > Hello Fabian,
> > > I would suggest we should first start working on runtime design of
> > > over window and aggregate. Once we have a good design there, one can
> > > easily
> > add
> > > the support for SQL as well as tableAPI. What do you think?
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >
> > > > Hi Radu,
> > > >
> > > > thanks for your comments!
> > > >
> > > > Yes, my intention is to open new JIRA issues to structure the
> > > > development process. Everybody is very welcome to pick up issues
> > > > and discuss the design proposals.
> > > > At the moment I see the following six issues to start with:
> > > >
> > > > - streaming SQL OVER ROW for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for event time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > For each of these windows we need corresponding translation rules
> > > > and execution code.
> > > >
> > > > Subsequent JIRAs would be
> > > > - extending the Table API for supported SQL windows
> > > > - add support for FOLLOWING
> > > > - etc.
> > > >
> > > > Regarding the requirement for a sorted state. I am not sure if the
> > > > OVER windows should be implemented using Flink's DataStream window
> > > framework.
> > > > We need a good design document to figure out what is the best
> > > > approach. A ProcessFunction with a sorted state might be a good
> > solution
> > > as well.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for starting these discussion - it is very useful.
> > > > > It does make sense indeed to refactor all these and coordinate a
> > > > > bit the efforts not to have overlapping implementations and
> > > > > incompatible
> > > > solutions.
> > > > >
> > > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > > redesign them and open new ones? Do you need help from our side
> > > > > - we can also pick the redesign of some of these new jira
> > > > > issues. For example we already
> > > > have
> > > > > an implementation for this and we can help with the design.
> > > > > Nevertheless, let's coordinate the effort.
> > > > >
> > > > > Regarding the support for the different types of window - I
> > > > > think the
> > > > best
> > > > > option is to split the implementation in small units. We can
> > > > > easily do
> > > > this
> > > > > from the transformation rule class and with this each particular
> > > > > type of window (session/sliding/sliderows/processing time/...)
> > > > > will have a clear implementation and a corresponding
> > > > > architecture within
> > > the jira issue?
> > > > What
> > > > > do you think about such a granularity?
> > > > >
> > > > > Regarding the issue of " Q4: The implementaion of SlideRows
> > > > > still need a custom operator that collects records in a priority
> > > > > queue ordered by the "rowtime", which is similar to the design
> > > > > we discussed in FLINK-4697, right? "
> > > > > Why would you need this operator? The window buffer can act to
> > > > > some
> > > > extent
> > > > > as a priority queue as long as the trigger and evictor is set to
> > > > > work
> > > > based
> > > > > on the rowtime - or maybe I am missing something... Can you
> > > > > please
> > > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > > Dr. Radu Tudoran
> > > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > > >
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > > Riesstrasse 25, 80992 München
> > > > >
> > > > > E-mail: radu.tudo...@huawei.com
> > > > > Mobile: +49 15209084330
> > > > > Telephone: +49 891588344173
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > > Düsseldorf, Germany, www.huawei.com Registered Office:
> > > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> > > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> > > > > and its attachments contain confidential information from
> > > > > HUAWEI, which is intended only for the person or entity whose
> > > > > address
> > > is
> > > > > listed above. Any use of the information contained herein in any
> > > > > way (including, but not limited to, total or partial disclosure,
> > > > reproduction,
> > > > > or dissemination) by persons other than the intended
> > > > > recipient(s) is prohibited. If you receive this e-mail in error,
> > > > > please notify the
> > > sender
> > > > > by phone or email immediately and delete it!
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: Jark Wu [mailto:wuchong...@alibaba-inc.com]
> > > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > > To: dev@flink.apache.org
> > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > for
> > > > > streaming tables
> > > > >
> > > > > Hi Fabian,
> > > > >
> > > > > Thanks for bringing up this discussion and the nice approach to
> > > > > avoid overlapping contributions.
> > > > >
> > > > > All of these make sense to me. But I have some questions.
> > > > >
> > > > > Q1: If I understand correctly, we will not support TumbleRows
> > > > > and SessionRows at the beginning. But maybe support them as a
> > > > > syntax
> > sugar
> > > > (in
> > > > > Table API) when the SlideRows is supported in the future. Right ?
> > > > >
> > > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> > > > > how
> > to
> > > > > partition on "gap-separated".
> > > > >
> > > > > Q3: Should we break down the approach into smaller tasks for
> > streaming
> > > > > tables and batch tables ?
> > > > >
> > > > > Q4: The implementaion of SlideRows still need a custom operator
> > > > > that collects records in a priority queue ordered by the
> > > > > "rowtime", which
> > is
> > > > > similar to the design we discussed in FLINK-4697, right?
> > > > >
> > > > > +1 not support for OVER ROW for event time at this point.
> > > > >
> > > > > Regards, Jark
> > > > >
> > > > >
> > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道:
> > > > > >
> > > > > > Hi,
> > > > > > We are also interested in streaming sql and very willing to
> > > participate
> > > > > and contribute.
> > > > > >
> > > > > > We are now in progress and we will also contribute to calcite
> > > > > > to
> > push
> > > > > forward the window and stream-join support.
> > > > > >
> > > > > >
> > > > > >
> > > > > > --------------
> > > > > > Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time:
> > > 2017年1月24日
> > > > > > 5:55
> > > > > > Receiver: dev@flink.apache.org
> > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > > > for streaming tables
> > > > > >
> > > > > > Hi Haohui,
> > > > > >
> > > > > > our plan was in fact to piggy-back on Calcite and use the
> > > > > > TUMBLE
> > > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > > Unfortunately, this issue does not seem to be very active, so
> > > > > > I
> > don't
> > > > > know what the progress is.
> > > > > >
> > > > > > I would suggest to move the discussion about group windows to
> > > > > > a
> > > > separate
> > > > > thread and keep this one focused on the organization of the SQL
> > > > > OVER windows.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > > >
> > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>:
> > > > > >
> > > > > >> Hi Fabian,
> > > > > >>
> > > > > >> FLINK-4692 has added the support for tumbling window and we
> > > > > >> are excited to try it out and expose it as a SQL construct.
> > > > > >>
> > > > > >> Just curious -- what's your thought on the SQL syntax on
> > > > > >> tumbling
> > > > > window?
> > > > > >>
> > > > > >> Implementation wise it might make sense to think tumbling
> > > > > >> window
> > as
> > > a
> > > > > >> special case of the sliding window.
> > > > > >>
> > > > > >> The problem I see is that the OVER construct might be
> > > > > >> insufficient
> > > to
> > > > > >> support all the use cases of tumbling windows. For example,
> > > > > >> it
> > fails
> > > > > >> to express tumbling windows that have fractional time units
> > > > > >> (as pointed out in http://calcite.apache.org/docs/stream.html).
> > > > > >>
> > > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> > > > > >> address
> > this
> > > > > issue.
> > > > > >>
> > > > > >> Do you think it is a good idea to follow the same conventions?
> > Your
> > > > > >> ideas are appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Haohui
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai
> > > > > >> <ricet...@gmail.com>
> > > > wrote:
> > > > > >>
> > > > > >>> +1
> > > > > >>>
> > > > > >>> We are also quite interested in these features and would
> > > > > >>> love to participate and contribute.
> > > > > >>>
> > > > > >>> ~Haohui
> > > > > >>>
> > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske
> > > > > >>> <fhue...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi everybody,
> > > > > >>>>
> > > > > >>>> it seems that currently several contributors are working on
> > > > > >>>> new features for the streaming Table API / SQL around row
> > > > > >>>> windows
> > (as
> > > > > >>>> defined in
> > > > > >>>> FLIP-11
> > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > > FLINK-4680,
> > > > > >>>> FLINK-5584).
> > > > > >>>> Since these efforts overlap quite a bit I spent some time
> > thinking
> > > > > >>>> about how we can approach these features and how to avoid
> > > > > >>>> overlapping contributions.
> > > > > >>>>
> > > > > >>>> The challenge here is the following. Some of the Table API
> > > > > >>>> row windows
> > > > > >> as
> > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > other
> > > > > >>>> cannot be easily expressed as such (TumbleRows for
> > > > > >>>> row-count intervals, SessionRows).
> > > > > >>>> However, since Calcite already supports SQL OVER windows,
> > > > > >>>> we can reuse
> > > > > >> the
> > > > > >>>> optimization logic for some of the Table API row windows. I
> > > > > >>>> also thought about the semantics of the TumbleRows and
> > > > > >>>> SessionRows windows as defined in
> > > > > >>>> FLIP-11 and came to the conclusion that these are not well
> > defined
> > > > > >>>> in
> > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> > > > > >>>> with a special PARTITION BY clause.
> > > > > >>>>
> > > > > >>>> I propose to approach SQL OVER windows and Table API row
> > > > > >>>> windows
> > > as
> > > > > >>>> follows:
> > > > > >>>>
> > > > > >>>> We start with three simple cases for SQL OVER windows (not
> > > > > >>>> Table API
> > > > > >> yet):
> > > > > >>>>
> > > > > >>>> * OVER RANGE for event time
> > > > > >>>> * OVER RANGE for processing time
> > > > > >>>> * OVER ROW for processing time
> > > > > >>>>
> > > > > >>>> All cases fulfill the following restrictions:
> > > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> > > > > >>>> on a marker function that indicates processing time.
> > > > > >>>> Additional sort attributes are not supported initially.
> > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > "BETWEEN
> > > x
> > > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > > >>>>
> > > > > >>>> OVER ROW for event time cannot be easily supported. With
> > > > > >>>> event time, we may have late records which need to be
> > > > > >>>> injected into
> > the
> > > > > >>>> order of records.
> > > > > >>>> When
> > > > > >>>> a record in injected in to the order where a row-count
> > > > > >>>> window
> > has
> > > > > >> already
> > > > > >>>> been computed, this and all following windows will change.
> > > > > >>>> We
> > > could
> > > > > >> either
> > > > > >>>> drop the record or sent out many retraction records. I
> > > > > >>>> think it
> > is
> > > > > >>>> best
> > > > > >> to
> > > > > >>>> not open this can of worms at this point.
> > > > > >>>>
> > > > > >>>> The rational for all of the above restrictions is to have
> > > > > >>>> first versions of OVER windows soon.
> > > > > >>>> Once we have the above cases covered we can extend and
> > > > > >>>> remove
> > > > > >> limitations
> > > > > >>>> as follows:
> > > > > >>>>
> > > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > > above).
> > > > > >>>> This will be mostly API work since the execution part has
> > > > > >>>> been
> > > > solved
> > > > > before.
> > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > > >>>> - Add support for different windows in SELECT. All windows
> > > > > >>>> must
> > be
> > > > > >>>> partitioned and ordered in the same way.
> > > > > >>>> - Add support for additional ORDER BY attributes (besides
> time).
> > > > > >>>>
> > > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> > FLIP-11
> > > > > >>>> are
> > > > > >> not
> > > > > >>>> well defined, IMO.
> > > > > >>>> They can be expressed as SlideRows windows with special
> > > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> > > > > >>>> ranges for TumbleRows, and gap-separated, non-overlapping
> > > > > >>>> time ranges
> > for
> > > > > >>>> SessionRows) I would not start to work on those yet.
> > > > > >>>>
> > > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > > development
> > > > > >>>> of these
> > > > > >> features
> > > > > >>>> as outlined above with corresponding JIRA issues.
> > > > > >>>>
> > > > > >>>> What do others think? (I cc'ed the contributors assigned to
> > > > > >>>> the above
> > > > > >> JIRA
> > > > > >>>> issues)
> > > > > >>>>
> > > > > >>>> Best, Fabian
> > > > > >>>>
> > > > > >>>> [1]
> > > > > >>>>
> > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to