Hi, Fabian, Shaoxuan, Yuhong - OVER RANGE for processing time I think your design make sense. Only considering processing time will simplify the design, make it robust. The state will be saved in a queue, and the incoming data line will apply the given and user defined function one by one. Do I understand right?
- OVER RANGE for event time Sorted state is better for out-of-order message to insert. Sorted state maybe use linked list, if the state is enough huge, maybe the re-calculation will be slow, because it's not sequential memory data. @shoxuan Do I understand right? Thanks a lot Best Jinkui Shi > On Jan 25, 2017, at 17:55, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi everybody, > > thanks for the great discussions so far. It's awesome to see so much > interest in this topic! > > First, I'd like to comment on the development process for this feature and > later on the design of the runtime: > > Dev Process > ---- > @Shaoxuan, I completely agree with you. We should first come up with good > designs for the runtime operators of the different window types. Once we > have that, we can start implementing the operators and integrate them with > Calcite's optimization. This will be an intermediate step and as a > byproduct give us support for SQL OVER windows. Once this is done, we can > extend the Table API and translate the Table API calls into the same > RelNodes as Calcite's SQL parser does. > > Runtime Design > ---- > I think it makes sense to distinguish the different types of OVER windows > because they have different requirements which result in different runtime > implementations (with different implementation complexity and performance). > In a previous mail I proposed to split the support for OVER windows into > the following subtasks: > > # bounded PRECEDING > - OVER ROWS for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count. > - A GlobalWindow with evictor + trigger might be the best implementation > (basically the same as DataStream.countWindow(long, long). We need to add > timeouts to clean up state for non-used keys though. > > - OVER RANGE for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count > - I think this could also be implemented with a GlobalWindow with evictor > + trigger (need to verify) > > - OVER RANGE for event time > - need for sorted state (late data possible) > - IMO, a ProcessFunction gives us the most flexibility in adding later > features (retraction, update rate, etc.) > - @Shaoxuan, you sketched a good design. Would you like to continue with > a design proposal? > > # UNBOUNDED PRECEDING > Similar considerations apply for the UNBOUNDED PRECEDING cases of the above > window types. > > If we all agree that the separation into six JIRAs (bounded/unbounded * > row-pt/range-pt/ range-et) makes sense, I would suggest to move the > discussions about the design of the implementation to the individual JIRAs. > > What do think? > > Best, Fabian > > 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>: > >> Hi Liuxinchun, >> I am not sure where did you get the inception: anyone has suggested "to >> process Event time window in Sliding Row Window". If you were referring my >> post, there may be some misunderstanding there. I think you were asking the >> similar question as Hongyuhong. I have just replied to him. Please take a >> look and let me know if that makes sense to you. "Retraction" is an >> important building block to compute correct incremental results in >> streaming. It is another big topic, we should discuss this in another >> thread. >> >> Regards, >> Shaoxuan >> >> >> >> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com> wrote: >> >>> I don't think it is a good idea to process Event time window in Sliding >>> Row Window. In Sliding Time window, when an element is late, we can >> trigger >>> the recalculation of the related windows. And the sliding period is >>> coarse-gained, We only need to recalculate size/sliding number of >> windows. >>> But in Sliding Row Window, the calculation is triggered when every >> element >>> is coming. The sliding period is becoming fine-gained. When an element is >>> late, there are so many "windows" are influenced. Even if we store all >> the >>> raw data, the computation is very large. >>> >>> I think if it is possible to set a standard to sliding Event Time Row >>> Window, When certain elements are late, we can only recalculate partial >>> windows and permit some error. For example, we can only recalculate the >>> windows end in range between (lateElement.timestamp - leftDelta, >>> lateElement.timestamp] and those windows begin in range between >>> [lateElement.timestamp, lateElement.timestamp + rightDelta). >>> //////////////////////////////////////////////////////////// >>> ////////////////////////// >>> Hi everyone, >>> Thanks for this great discussion, and glad to see more and more people >> are >>> interested on stream SQL & tableAPI. >>> >>> IMO, the key problems for Over window design are the SQL semantics and >> the >>> runtime design. I totally agree with Fabian that we should skip the >> design >>> of TumbleRows and SessionRows windows for now, as they are not well >> defined >>> in SQL semantics. >>> >>> Runtime design is the most crucial part we are interested in and >>> volunteered to contribute into. We have thousands of machines running >> flink >>> streaming jobs. The costs in terms of CPU, memory, and state are the >> vital >>> factors that we have to taken into account. We have been working on the >>> design of OVER window in the past months, and planning to send out a >>> detailed design doc to DEV quite soon. But since Fabian started a good >>> discussion on OVER window, I would like to share our ideas/thoughts about >>> the runtime design for OVER window. >>> >>> 1. As SunJincheng pointed out earlier, sliding window does not work >> for >>> unbounded preceding, we need alternative approach for unbound over >>> window. >>> 2. Though sliding window may work for some cases of bounded window, >>> it is not very efficient thereby should not be used for production. To >>> the >>> best of my understanding, the current runtime implementation of >> sliding >>> window has not leveraged the concepts of state Panes yet. This means >>> that >>> if we use sliding window for OVER window, there will be a backend >> state >>> created per each group (partition by) and each row, and whenever a new >>> record arrives, it will be accumulated to all the existing windows >> that >>> has >>> not been closed. This would cause quite a lot of overhead in terms of >>> both >>> CPU and memory&state. >>> 3. Fabian has mentioned an approach of leveraging “ProcessFunction” >> and >>> a “sortedState”. I like this idea. The design details on this are not >>> quite >>> clear yet. So I would like to add more thoughts on this. Regardless >>> which dataStream API we are going to use (it is very likely that we >> need >>> a new API), we should come out with an optimal approach. The purpose >> of >>> grouping window and over window is to partition the data, such that we >>> can >>> generate the aggregate results. So when we talk about the design of >> OVER >>> window, we have to think about the aggregates. As we proposed in our >>> recent >>> UDAGG doc https://goo.gl/6ntclB, the user defined accumulator will >> be >>> stored in the aggregate state. Besides accumulator, we have also >>> introduced >>> a retract API for UDAGG. With aggregate accumulator and retract API, I >>> am >>> proposing a runtime approach to implement the OVER window as >> followings. >>> 4. >>> - We first implement a sorted state interface >>> - Per each group, we just create one sorted state. When a new >> record >>> arrives, it will insert into this sorted state, in the meanwhile it >>> will be >>> accumulated to the aggregate accumulator. >>> - For over window, we keep the aggregate accumulator for the entire >>> job lifelong time. This is different than the case where we delete >>> the >>> accumulator for each group/window when a grouping-window is >> finished. >>> - When an over window is up to trigger, we grab the >>> previous accumulator from the state and accumulate values onto it >>> with all >>> the records till the upperBoundary of the current window, and >>> retract all >>> the out of scope records till its lowerBoundary. We emit the >>> aggregate result and save the accumulator for the next window. >>> >>> >>> Hello Fabian, >>> I would suggest we should first start working on runtime design of over >>> window and aggregate. Once we have a good design there, one can easily >> add >>> the support for SQL as well as tableAPI. What do you think? >>> >>> Regards, >>> Shaoxuan >>> >>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com> >> wrote: >>> >>>> Hi Radu, >>>> >>>> thanks for your comments! >>>> >>>> Yes, my intention is to open new JIRA issues to structure the >>>> development process. Everybody is very welcome to pick up issues and >>>> discuss the design proposals. >>>> At the moment I see the following six issues to start with: >>>> >>>> - streaming SQL OVER ROW for processing time >>>> - bounded PRECEDING >>>> - unbounded PRECEDING >>>> >>>> - streaming SQL OVER RANGE for processing time >>>> - bounded PRECEDING >>>> - unbounded PRECEDING >>>> >>>> - streaming SQL OVER RANGE for event time >>>> - bounded PRECEDING >>>> - unbounded PRECEDING >>>> >>>> For each of these windows we need corresponding translation rules and >>>> execution code. >>>> >>>> Subsequent JIRAs would be >>>> - extending the Table API for supported SQL windows >>>> - add support for FOLLOWING >>>> - etc. >>>> >>>> Regarding the requirement for a sorted state. I am not sure if the >>>> OVER windows should be implemented using Flink's DataStream window >>> framework. >>>> We need a good design document to figure out what is the best >>>> approach. A ProcessFunction with a sorted state might be a good >> solution >>> as well. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: >>>> >>>>> Hi all, >>>>> >>>>> Thanks for starting these discussion - it is very useful. >>>>> It does make sense indeed to refactor all these and coordinate a bit >>>>> the efforts not to have overlapping implementations and incompatible >>>> solutions. >>>>> >>>>> If you close the 3 jira issues you mentioned - do you plan to >>>>> redesign them and open new ones? Do you need help from our side - we >>>>> can also pick the redesign of some of these new jira issues. For >>>>> example we already >>>> have >>>>> an implementation for this and we can help with the design. >>>>> Nevertheless, let's coordinate the effort. >>>>> >>>>> Regarding the support for the different types of window - I think >>>>> the >>>> best >>>>> option is to split the implementation in small units. We can easily >>>>> do >>>> this >>>>> from the transformation rule class and with this each particular >>>>> type of window (session/sliding/sliderows/processing time/...) will >>>>> have a clear implementation and a corresponding architecture within >>> the jira issue? >>>> What >>>>> do you think about such a granularity? >>>>> >>>>> Regarding the issue of " Q4: The implementaion of SlideRows still >>>>> need a custom operator that collects records in a priority queue >>>>> ordered by the "rowtime", which is similar to the design we >>>>> discussed in FLINK-4697, right? " >>>>> Why would you need this operator? The window buffer can act to some >>>> extent >>>>> as a priority queue as long as the trigger and evictor is set to >>>>> work >>>> based >>>>> on the rowtime - or maybe I am missing something... Can you please >>>> clarify >>>>> this. >>>>> >>>>> >>>>> Dr. Radu Tudoran >>>>> Senior Research Engineer - Big Data Expert IT R&D Division >>>>> >>>>> >>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>> European Research Center >>>>> Riesstrasse 25, 80992 München >>>>> >>>>> E-mail: radu.tudo...@huawei.com >>>>> Mobile: +49 15209084330 >>>>> Telephone: +49 891588344173 >>>>> >>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>> This e-mail and its attachments contain confidential information from >>>>> HUAWEI, which is intended only for the person or entity whose address >>> is >>>>> listed above. Any use of the information contained herein in any way >>>>> (including, but not limited to, total or partial disclosure, >>>> reproduction, >>>>> or dissemination) by persons other than the intended recipient(s) is >>>>> prohibited. If you receive this e-mail in error, please notify the >>> sender >>>>> by phone or email immediately and delete it! >>>>> >>>>> >>>>> -----Original Message----- >>>>> From: Jark Wu [mailto:wuchong...@alibaba-inc.com] >>>>> Sent: Tuesday, January 24, 2017 6:53 AM >>>>> To: dev@flink.apache.org >>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row >> Windows >>>> for >>>>> streaming tables >>>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for bringing up this discussion and the nice approach to avoid >>>>> overlapping contributions. >>>>> >>>>> All of these make sense to me. But I have some questions. >>>>> >>>>> Q1: If I understand correctly, we will not support TumbleRows and >>>>> SessionRows at the beginning. But maybe support them as a syntax >> sugar >>>> (in >>>>> Table API) when the SlideRows is supported in the future. Right ? >>>>> >>>>> Q2: How to support SessionRows based on SlideRows ? I don't get how >> to >>>>> partition on "gap-separated". >>>>> >>>>> Q3: Should we break down the approach into smaller tasks for >> streaming >>>>> tables and batch tables ? >>>>> >>>>> Q4: The implementaion of SlideRows still need a custom operator that >>>>> collects records in a priority queue ordered by the "rowtime", which >> is >>>>> similar to the design we discussed in FLINK-4697, right? >>>>> >>>>> +1 not support for OVER ROW for event time at this point. >>>>> >>>>> Regards, Jark >>>>> >>>>> >>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道: >>>>>> >>>>>> Hi, >>>>>> We are also interested in streaming sql and very willing to >>> participate >>>>> and contribute. >>>>>> >>>>>> We are now in progress and we will also contribute to calcite to >> push >>>>> forward the window and stream-join support. >>>>>> >>>>>> >>>>>> >>>>>> -------------- >>>>>> Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time: >>> 2017年1月24日 >>>>>> 5:55 >>>>>> Receiver: dev@flink.apache.org >>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row >> Windows >>>>>> for streaming tables >>>>>> >>>>>> Hi Haohui, >>>>>> >>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE >>>>> function [1] once is it is available (CALCITE-1345 [2]). >>>>>> Unfortunately, this issue does not seem to be very active, so I >> don't >>>>> know what the progress is. >>>>>> >>>>>> I would suggest to move the discussion about group windows to a >>>> separate >>>>> thread and keep this one focused on the organization of the SQL OVER >>>>> windows. >>>>>> >>>>>> Best, >>>>>> Fabian >>>>>> >>>>>> [1] http://calcite.apache.org/docs/stream.html) >>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>> >>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> FLINK-4692 has added the support for tumbling window and we are >>>>>>> excited to try it out and expose it as a SQL construct. >>>>>>> >>>>>>> Just curious -- what's your thought on the SQL syntax on tumbling >>>>> window? >>>>>>> >>>>>>> Implementation wise it might make sense to think tumbling window >> as >>> a >>>>>>> special case of the sliding window. >>>>>>> >>>>>>> The problem I see is that the OVER construct might be insufficient >>> to >>>>>>> support all the use cases of tumbling windows. For example, it >> fails >>>>>>> to express tumbling windows that have fractional time units (as >>>>>>> pointed out in http://calcite.apache.org/docs/stream.html). >>>>>>> >>>>>>> It looks to me that the Calcite / Azure Stream Analytics have >>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address >> this >>>>> issue. >>>>>>> >>>>>>> Do you think it is a good idea to follow the same conventions? >> Your >>>>>>> ideas are appreciated. >>>>>>> >>>>>>> Regards, >>>>>>> Haohui >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <ricet...@gmail.com> >>>> wrote: >>>>>>> >>>>>>>> +1 >>>>>>>> >>>>>>>> We are also quite interested in these features and would love to >>>>>>>> participate and contribute. >>>>>>>> >>>>>>>> ~Haohui >>>>>>>> >>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <fhue...@gmail.com >>> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi everybody, >>>>>>>>> >>>>>>>>> it seems that currently several contributors are working on new >>>>>>>>> features for the streaming Table API / SQL around row windows >> (as >>>>>>>>> defined in >>>>>>>>> FLIP-11 >>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, >>> FLINK-4680, >>>>>>>>> FLINK-5584). >>>>>>>>> Since these efforts overlap quite a bit I spent some time >> thinking >>>>>>>>> about how we can approach these features and how to avoid >>>>>>>>> overlapping contributions. >>>>>>>>> >>>>>>>>> The challenge here is the following. Some of the Table API row >>>>>>>>> windows >>>>>>> as >>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while >> other >>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count >>>>>>>>> intervals, SessionRows). >>>>>>>>> However, since Calcite already supports SQL OVER windows, we can >>>>>>>>> reuse >>>>>>> the >>>>>>>>> optimization logic for some of the Table API row windows. I also >>>>>>>>> thought about the semantics of the TumbleRows and SessionRows >>>>>>>>> windows as defined in >>>>>>>>> FLIP-11 and came to the conclusion that these are not well >> defined >>>>>>>>> in >>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows with a >>>>>>>>> special PARTITION BY clause. >>>>>>>>> >>>>>>>>> I propose to approach SQL OVER windows and Table API row windows >>> as >>>>>>>>> follows: >>>>>>>>> >>>>>>>>> We start with three simple cases for SQL OVER windows (not Table >>>>>>>>> API >>>>>>> yet): >>>>>>>>> >>>>>>>>> * OVER RANGE for event time >>>>>>>>> * OVER RANGE for processing time >>>>>>>>> * OVER ROW for processing time >>>>>>>>> >>>>>>>>> All cases fulfill the following restrictions: >>>>>>>>> - All aggregations in SELECT must refer to the same window. >>>>>>>>> - PARTITION BY may not contain the rowtime attribute. >>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or on a >>>>>>>>> marker function that indicates processing time. Additional sort >>>>>>>>> attributes are not supported initially. >>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and >> "BETWEEN >>> x >>>>>>>>> PRECEDING AND CURRENT ROW" are supported. >>>>>>>>> >>>>>>>>> OVER ROW for event time cannot be easily supported. With event >>>>>>>>> time, we may have late records which need to be injected into >> the >>>>>>>>> order of records. >>>>>>>>> When >>>>>>>>> a record in injected in to the order where a row-count window >> has >>>>>>> already >>>>>>>>> been computed, this and all following windows will change. We >>> could >>>>>>> either >>>>>>>>> drop the record or sent out many retraction records. I think it >> is >>>>>>>>> best >>>>>>> to >>>>>>>>> not open this can of worms at this point. >>>>>>>>> >>>>>>>>> The rational for all of the above restrictions is to have first >>>>>>>>> versions of OVER windows soon. >>>>>>>>> Once we have the above cases covered we can extend and remove >>>>>>> limitations >>>>>>>>> as follows: >>>>>>>>> >>>>>>>>> - Table API SlideRow windows (with the same restrictions as >>> above). >>>>>>>>> This will be mostly API work since the execution part has been >>>> solved >>>>> before. >>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING) >>>>>>>>> - Add support for different windows in SELECT. All windows must >> be >>>>>>>>> partitioned and ordered in the same way. >>>>>>>>> - Add support for additional ORDER BY attributes (besides time). >>>>>>>>> >>>>>>>>> As I said before, TumbleRows and SessionRows windows as in >> FLIP-11 >>>>>>>>> are >>>>>>> not >>>>>>>>> well defined, IMO. >>>>>>>>> They can be expressed as SlideRows windows with special >>>>>>>>> partitioning (partitioning on fixed, non-overlapping time ranges >>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges >> for >>>>>>>>> SessionRows) I would not start to work on those yet. >>>>>>>>> >>>>>>>>> I would like to close all related JIRA issues (FLINK-4678, >>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the >>> development >>>>>>>>> of these >>>>>>> features >>>>>>>>> as outlined above with corresponding JIRA issues. >>>>>>>>> >>>>>>>>> What do others think? (I cc'ed the contributors assigned to the >>>>>>>>> above >>>>>>> JIRA >>>>>>>>> issues) >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- >>>>>>> 11%3A+Table+API+Stream+Aggregations >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >>