Hi, Fabian, Shaoxuan, Yuhong 

- OVER RANGE for processing time
I think your design make sense. Only considering processing time will simplify 
the design, make it robust.
The state will be saved in a queue, and the incoming data line will apply the 
given and user defined function one by one. 
Do I understand right?

- OVER RANGE for event time
Sorted state is better for out-of-order message to insert. Sorted state maybe 
use linked list, if the state is enough huge, maybe the re-calculation will be 
slow, because it's not sequential memory data.
@shoxuan Do I understand right? 

Thanks a lot

Best
Jinkui Shi

> On Jan 25, 2017, at 17:55, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi everybody,
> 
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
> 
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
> 
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
> 
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
> 
> # bounded PRECEDING
> - OVER ROWS for processing time
>  - does not require sorted state (data always arrives in processing time
> order)
>  - no need to consider retraction (processing time is never late)
>  - defines windows on row count.
>  - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
> 
> - OVER RANGE for processing time
>  - does not require sorted state (data always arrives in processing time
> order)
>  - no need to consider retraction (processing time is never late)
>  - defines windows on row count
>  - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
> 
> - OVER RANGE for event time
>  - need for sorted state (late data possible)
>  - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>  - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
> 
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
> window types.
> 
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
> 
> What do think?
> 
> Best, Fabian
> 
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>:
> 
>> Hi Liuxinchun,
>> I am not sure where did you get the inception: anyone has suggested "to
>> process Event time window in Sliding Row Window". If you were referring my
>> post, there may be some misunderstanding there. I think you were asking the
>> similar question as Hongyuhong. I have just replied to him. Please take a
>> look and let me know if that makes sense to you. "Retraction" is an
>> important building block to compute correct incremental results in
>> streaming. It is another big topic, we should discuss this in another
>> thread.
>> 
>> Regards,
>> Shaoxuan
>> 
>> 
>> 
>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com> wrote:
>> 
>>> I don't think it is a good idea to process Event time window in Sliding
>>> Row Window. In Sliding Time window, when an element is late, we can
>> trigger
>>> the recalculation of the related windows. And the sliding period is
>>> coarse-gained, We only need to recalculate size/sliding number of
>> windows.
>>> But in Sliding Row Window, the calculation is triggered when every
>> element
>>> is coming. The sliding period is becoming fine-gained. When an element is
>>> late, there are so many "windows" are influenced. Even if we store all
>> the
>>> raw data, the computation is very large.
>>> 
>>> I think if it is possible to set a standard to sliding Event Time Row
>>> Window, When certain elements are late, we can only recalculate partial
>>> windows and permit some error. For example, we can only recalculate the
>>> windows end in range between (lateElement.timestamp - leftDelta,
>>> lateElement.timestamp] and those windows begin in range between
>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
>>> ////////////////////////////////////////////////////////////
>>> //////////////////////////
>>> Hi everyone,
>>> Thanks for this great discussion, and glad to see more and more people
>> are
>>> interested on stream SQL & tableAPI.
>>> 
>>> IMO, the key problems for Over window design are the SQL semantics and
>> the
>>> runtime design. I totally agree with Fabian that we should skip the
>> design
>>> of TumbleRows and SessionRows windows for now, as they are not well
>> defined
>>> in SQL semantics.
>>> 
>>> Runtime design is the most crucial part we are interested in and
>>> volunteered to contribute into. We have thousands of machines running
>> flink
>>> streaming jobs. The costs in terms of CPU, memory, and state are the
>> vital
>>> factors that we have to taken into account. We have been working on the
>>> design of OVER window in the past months, and planning to send out a
>>> detailed design doc to DEV quite soon. But since Fabian started a good
>>> discussion on OVER window, I would like to share our ideas/thoughts about
>>> the runtime design for OVER window.
>>> 
>>>   1. As SunJincheng pointed out earlier, sliding window does not work
>> for
>>>   unbounded preceding, we need alternative approach for unbound over
>>> window.
>>>   2. Though sliding window may work for some cases of bounded window,
>>>   it is not very efficient thereby should not be used for production. To
>>> the
>>>   best of my understanding, the current runtime implementation of
>> sliding
>>>   window has not leveraged the concepts of state Panes yet. This means
>>> that
>>>   if we use sliding window for OVER window,  there will be a backend
>> state
>>>   created per each group (partition by) and each row, and whenever a new
>>>   record arrives, it will be accumulated to all the existing windows
>> that
>>> has
>>>   not been closed. This would cause quite a lot of overhead in terms of
>>> both
>>>   CPU and memory&state.
>>>   3. Fabian has mentioned an approach of leveraging “ProcessFunction”
>> and
>>>   a “sortedState”. I like this idea. The design details on this are not
>>> quite
>>>   clear yet. So I would like to add more thoughts on this. Regardless
>>>   which dataStream API we are going to use (it is very likely that we
>> need
>>>   a new API), we should come out with an optimal approach. The purpose
>> of
>>>   grouping window and over window is to partition the data, such that we
>>> can
>>>   generate the aggregate results. So when we talk about the design of
>> OVER
>>>   window, we have to think about the aggregates. As we proposed in our
>>> recent
>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
>> be
>>>   stored in the aggregate state. Besides accumulator, we have also
>>> introduced
>>>   a retract API for UDAGG. With aggregate accumulator and retract API, I
>>> am
>>>   proposing a runtime approach to implement the OVER window as
>> followings.
>>>   4.
>>>      - We first implement a sorted state interface
>>>      - Per each group, we just create one sorted state. When a new
>> record
>>>      arrives, it will insert into this sorted state, in the meanwhile it
>>> will be
>>>      accumulated to the aggregate accumulator.
>>>      - For over window, we keep the aggregate accumulator for the entire
>>>      job lifelong time. This is different than the case where we delete
>>> the
>>>      accumulator for each group/window when a grouping-window is
>> finished.
>>>      - When an over window is up to trigger, we grab the
>>>      previous accumulator from the state and accumulate values onto it
>>> with all
>>>      the records till the upperBoundary of the current window, and
>>> retract all
>>>      the out of scope records till its lowerBoundary. We emit the
>>>      aggregate result and save the accumulator for the next window.
>>> 
>>> 
>>> Hello Fabian,
>>> I would suggest we should first start working on runtime design of over
>>> window and aggregate. Once we have a good design there, one can easily
>> add
>>> the support for SQL as well as tableAPI. What do you think?
>>> 
>>> Regards,
>>> Shaoxuan
>>> 
>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>> 
>>>> Hi Radu,
>>>> 
>>>> thanks for your comments!
>>>> 
>>>> Yes, my intention is to open new JIRA issues to structure the
>>>> development process. Everybody is very welcome to pick up issues and
>>>> discuss the design proposals.
>>>> At the moment I see the following six issues to start with:
>>>> 
>>>> - streaming SQL OVER ROW for processing time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>> 
>>>> - streaming SQL OVER RANGE for processing time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>> 
>>>> - streaming SQL OVER RANGE for event time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>> 
>>>> For each of these windows we need corresponding translation rules and
>>>> execution code.
>>>> 
>>>> Subsequent JIRAs would be
>>>> - extending the Table API for supported SQL windows
>>>> - add support for FOLLOWING
>>>> - etc.
>>>> 
>>>> Regarding the requirement for a sorted state. I am not sure if the
>>>> OVER windows should be implemented using Flink's DataStream window
>>> framework.
>>>> We need a good design document to figure out what is the best
>>>> approach. A ProcessFunction with a sorted state might be a good
>> solution
>>> as well.
>>>> 
>>>> Best, Fabian
>>>> 
>>>> 
>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>>>> 
>>>>> Hi all,
>>>>> 
>>>>> Thanks for starting these discussion - it is very useful.
>>>>> It does make sense indeed to refactor all these and coordinate a bit
>>>>> the efforts not to have overlapping implementations and incompatible
>>>> solutions.
>>>>> 
>>>>> If you close the 3 jira issues you mentioned - do you plan to
>>>>> redesign them and open new ones? Do you need help from our side - we
>>>>> can also pick the redesign of some of these new jira issues. For
>>>>> example we already
>>>> have
>>>>> an implementation for this and we can help with the design.
>>>>> Nevertheless, let's coordinate the effort.
>>>>> 
>>>>> Regarding the support for the different types of window - I think
>>>>> the
>>>> best
>>>>> option is to split the implementation in small units. We can easily
>>>>> do
>>>> this
>>>>> from the transformation rule class and with this each particular
>>>>> type of window (session/sliding/sliderows/processing time/...) will
>>>>> have a clear implementation and a corresponding architecture within
>>> the jira issue?
>>>> What
>>>>> do you think about such a granularity?
>>>>> 
>>>>> Regarding the issue of " Q4: The implementaion of SlideRows still
>>>>> need a custom operator that collects records in a priority queue
>>>>> ordered by the "rowtime", which is similar to the design we
>>>>> discussed in FLINK-4697, right? "
>>>>> Why would you need this operator? The window buffer can act to some
>>>> extent
>>>>> as a priority queue as long as the trigger and evictor is set to
>>>>> work
>>>> based
>>>>> on the rowtime - or maybe I am missing something... Can you please
>>>> clarify
>>>>> this.
>>>>> 
>>>>> 
>>>>> Dr. Radu Tudoran
>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
>>>>> 
>>>>> 
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> European Research Center
>>>>> Riesstrasse 25, 80992 München
>>>>> 
>>>>> E-mail: radu.tudo...@huawei.com
>>>>> Mobile: +49 15209084330
>>>>> Telephone: +49 891588344173
>>>>> 
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> This e-mail and its attachments contain confidential information from
>>>>> HUAWEI, which is intended only for the person or entity whose address
>>> is
>>>>> listed above. Any use of the information contained herein in any way
>>>>> (including, but not limited to, total or partial disclosure,
>>>> reproduction,
>>>>> or dissemination) by persons other than the intended recipient(s) is
>>>>> prohibited. If you receive this e-mail in error, please notify the
>>> sender
>>>>> by phone or email immediately and delete it!
>>>>> 
>>>>> 
>>>>> -----Original Message-----
>>>>> From: Jark Wu [mailto:wuchong...@alibaba-inc.com]
>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
>>>>> To: dev@flink.apache.org
>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
>> Windows
>>>> for
>>>>> streaming tables
>>>>> 
>>>>> Hi Fabian,
>>>>> 
>>>>> Thanks for bringing up this discussion and the nice approach to avoid
>>>>> overlapping contributions.
>>>>> 
>>>>> All of these make sense to me. But I have some questions.
>>>>> 
>>>>> Q1: If I understand correctly, we will not support TumbleRows and
>>>>> SessionRows at the beginning. But maybe support them as a syntax
>> sugar
>>>> (in
>>>>> Table API) when the SlideRows is supported in the future. Right ?
>>>>> 
>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get how
>> to
>>>>> partition on "gap-separated".
>>>>> 
>>>>> Q3: Should we break down the approach into smaller tasks for
>> streaming
>>>>> tables and batch tables ?
>>>>> 
>>>>> Q4: The implementaion of SlideRows still need a custom operator that
>>>>> collects records in a priority queue ordered by the "rowtime", which
>> is
>>>>> similar to the design we discussed in FLINK-4697, right?
>>>>> 
>>>>> +1 not support for OVER ROW for event time at this point.
>>>>> 
>>>>> Regards, Jark
>>>>> 
>>>>> 
>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道:
>>>>>> 
>>>>>> Hi,
>>>>>> We are also interested in streaming sql and very willing to
>>> participate
>>>>> and contribute.
>>>>>> 
>>>>>> We are now in progress and we will also contribute to calcite to
>> push
>>>>> forward the window and stream-join support.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --------------
>>>>>> Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time:
>>> 2017年1月24日
>>>>>> 5:55
>>>>>> Receiver: dev@flink.apache.org
>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
>> Windows
>>>>>> for streaming tables
>>>>>> 
>>>>>> Hi Haohui,
>>>>>> 
>>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE
>>>>> function [1] once is it is available (CALCITE-1345 [2]).
>>>>>> Unfortunately, this issue does not seem to be very active, so I
>> don't
>>>>> know what the progress is.
>>>>>> 
>>>>>> I would suggest to move the discussion about group windows to a
>>>> separate
>>>>> thread and keep this one focused on the organization of the SQL OVER
>>>>> windows.
>>>>>> 
>>>>>> Best,
>>>>>> Fabian
>>>>>> 
>>>>>> [1] http://calcite.apache.org/docs/stream.html)
>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>> 
>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>:
>>>>>> 
>>>>>>> Hi Fabian,
>>>>>>> 
>>>>>>> FLINK-4692 has added the support for tumbling window and we are
>>>>>>> excited to try it out and expose it as a SQL construct.
>>>>>>> 
>>>>>>> Just curious -- what's your thought on the SQL syntax on tumbling
>>>>> window?
>>>>>>> 
>>>>>>> Implementation wise it might make sense to think tumbling window
>> as
>>> a
>>>>>>> special case of the sliding window.
>>>>>>> 
>>>>>>> The problem I see is that the OVER construct might be insufficient
>>> to
>>>>>>> support all the use cases of tumbling windows. For example, it
>> fails
>>>>>>> to express tumbling windows that have fractional time units (as
>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
>>>>>>> 
>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
>> this
>>>>> issue.
>>>>>>> 
>>>>>>> Do you think it is a good idea to follow the same conventions?
>> Your
>>>>>>> ideas are appreciated.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Haohui
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <ricet...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>>> +1
>>>>>>>> 
>>>>>>>> We are also quite interested in these features and would love to
>>>>>>>> participate and contribute.
>>>>>>>> 
>>>>>>>> ~Haohui
>>>>>>>> 
>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <fhue...@gmail.com
>>> 
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi everybody,
>>>>>>>>> 
>>>>>>>>> it seems that currently several contributors are working on new
>>>>>>>>> features for the streaming Table API / SQL around row windows
>> (as
>>>>>>>>> defined in
>>>>>>>>> FLIP-11
>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
>>> FLINK-4680,
>>>>>>>>> FLINK-5584).
>>>>>>>>> Since these efforts overlap quite a bit I spent some time
>> thinking
>>>>>>>>> about how we can approach these features and how to avoid
>>>>>>>>> overlapping contributions.
>>>>>>>>> 
>>>>>>>>> The challenge here is the following. Some of the Table API row
>>>>>>>>> windows
>>>>>>> as
>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
>> other
>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
>>>>>>>>> intervals, SessionRows).
>>>>>>>>> However, since Calcite already supports SQL OVER windows, we can
>>>>>>>>> reuse
>>>>>>> the
>>>>>>>>> optimization logic for some of the Table API row windows. I also
>>>>>>>>> thought about the semantics of the TumbleRows and SessionRows
>>>>>>>>> windows as defined in
>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
>> defined
>>>>>>>>> in
>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows with a
>>>>>>>>> special PARTITION BY clause.
>>>>>>>>> 
>>>>>>>>> I propose to approach SQL OVER windows and Table API row windows
>>> as
>>>>>>>>> follows:
>>>>>>>>> 
>>>>>>>>> We start with three simple cases for SQL OVER windows (not Table
>>>>>>>>> API
>>>>>>> yet):
>>>>>>>>> 
>>>>>>>>> * OVER RANGE for event time
>>>>>>>>> * OVER RANGE for processing time
>>>>>>>>> * OVER ROW for processing time
>>>>>>>>> 
>>>>>>>>> All cases fulfill the following restrictions:
>>>>>>>>> - All aggregations in SELECT must refer to the same window.
>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or on a
>>>>>>>>> marker function that indicates processing time. Additional sort
>>>>>>>>> attributes are not supported initially.
>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
>> "BETWEEN
>>> x
>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
>>>>>>>>> 
>>>>>>>>> OVER ROW for event time cannot be easily supported. With event
>>>>>>>>> time, we may have late records which need to be injected into
>> the
>>>>>>>>> order of records.
>>>>>>>>> When
>>>>>>>>> a record in injected in to the order where a row-count window
>> has
>>>>>>> already
>>>>>>>>> been computed, this and all following windows will change. We
>>> could
>>>>>>> either
>>>>>>>>> drop the record or sent out many retraction records. I think it
>> is
>>>>>>>>> best
>>>>>>> to
>>>>>>>>> not open this can of worms at this point.
>>>>>>>>> 
>>>>>>>>> The rational for all of the above restrictions is to have first
>>>>>>>>> versions of OVER windows soon.
>>>>>>>>> Once we have the above cases covered we can extend and remove
>>>>>>> limitations
>>>>>>>>> as follows:
>>>>>>>>> 
>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
>>> above).
>>>>>>>>> This will be mostly API work since the execution part has been
>>>> solved
>>>>> before.
>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>>>>>>>> - Add support for different windows in SELECT. All windows must
>> be
>>>>>>>>> partitioned and ordered in the same way.
>>>>>>>>> - Add support for additional ORDER BY attributes (besides time).
>>>>>>>>> 
>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
>> FLIP-11
>>>>>>>>> are
>>>>>>> not
>>>>>>>>> well defined, IMO.
>>>>>>>>> They can be expressed as SlideRows windows with special
>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time ranges
>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges
>> for
>>>>>>>>> SessionRows) I would not start to work on those yet.
>>>>>>>>> 
>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
>>> development
>>>>>>>>> of these
>>>>>>> features
>>>>>>>>> as outlined above with corresponding JIRA issues.
>>>>>>>>> 
>>>>>>>>> What do others think? (I cc'ed the contributors assigned to the
>>>>>>>>> above
>>>>>>> JIRA
>>>>>>>>> issues)
>>>>>>>>> 
>>>>>>>>> Best, Fabian
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>>>>> 
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>> 11%3A+Table+API+Stream+Aggregations
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Reply via email to