Hi Jinkui Shi, Yuhong Hong, Sunjincheng, I'd suggest to discuss this on the actual JIRA issue. I think it would help to describe the design and status of the implementation.
Thanks, Fabian 2017-02-06 3:24 GMT+01:00 shijinkui <shijin...@huawei.com>: > hi,Fabian, sunjincheng > > Today is the first workday of 2017 in China. When we come back, I found > the SQL issues had been assigned between New Year... > Yuhong Hong is interest in FLINK-5657. She had implemented it before. Can > we reconsider to assign FLINK-5657 to her? > > Thanks > Jinkui Shi > > [1] https://issues.apache.org/jira/browse/FLINK-4557 > > -----邮件原件----- > 发件人: Fabian Hueske [mailto:fhue...@gmail.com] > 发送时间: 2017年1月25日 17:55 > 收件人: dev@flink.apache.org > 主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for > streaming tables > > Hi everybody, > > thanks for the great discussions so far. It's awesome to see so much > interest in this topic! > > First, I'd like to comment on the development process for this feature and > later on the design of the runtime: > > Dev Process > ---- > @Shaoxuan, I completely agree with you. We should first come up with good > designs for the runtime operators of the different window types. Once we > have that, we can start implementing the operators and integrate them with > Calcite's optimization. This will be an intermediate step and as a > byproduct give us support for SQL OVER windows. Once this is done, we can > extend the Table API and translate the Table API calls into the same > RelNodes as Calcite's SQL parser does. > > Runtime Design > ---- > I think it makes sense to distinguish the different types of OVER windows > because they have different requirements which result in different runtime > implementations (with different implementation complexity and performance). > In a previous mail I proposed to split the support for OVER windows into > the following subtasks: > > # bounded PRECEDING > - OVER ROWS for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count. > - A GlobalWindow with evictor + trigger might be the best implementation > (basically the same as DataStream.countWindow(long, long). We need to add > timeouts to clean up state for non-used keys though. > > - OVER RANGE for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count > - I think this could also be implemented with a GlobalWindow with evictor > + trigger (need to verify) > > - OVER RANGE for event time > - need for sorted state (late data possible) > - IMO, a ProcessFunction gives us the most flexibility in adding later > features (retraction, update rate, etc.) > - @Shaoxuan, you sketched a good design. Would you like to continue with > a design proposal? > > # UNBOUNDED PRECEDING > Similar considerations apply for the UNBOUNDED PRECEDING cases of the > above window types. > > If we all agree that the separation into six JIRAs (bounded/unbounded * > row-pt/range-pt/ range-et) makes sense, I would suggest to move the > discussions about the design of the implementation to the individual JIRAs. > > What do think? > > Best, Fabian > > 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>: > > > Hi Liuxinchun, > > I am not sure where did you get the inception: anyone has suggested > > "to process Event time window in Sliding Row Window". If you were > > referring my post, there may be some misunderstanding there. I think > > you were asking the similar question as Hongyuhong. I have just > > replied to him. Please take a look and let me know if that makes sense > > to you. "Retraction" is an important building block to compute correct > > incremental results in streaming. It is another big topic, we should > > discuss this in another thread. > > > > Regards, > > Shaoxuan > > > > > > > > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com> > wrote: > > > > > I don't think it is a good idea to process Event time window in > > > Sliding Row Window. In Sliding Time window, when an element is late, > > > we can > > trigger > > > the recalculation of the related windows. And the sliding period is > > > coarse-gained, We only need to recalculate size/sliding number of > > windows. > > > But in Sliding Row Window, the calculation is triggered when every > > element > > > is coming. The sliding period is becoming fine-gained. When an > > > element is late, there are so many "windows" are influenced. Even if > > > we store all > > the > > > raw data, the computation is very large. > > > > > > I think if it is possible to set a standard to sliding Event Time > > > Row Window, When certain elements are late, we can only recalculate > > > partial windows and permit some error. For example, we can only > > > recalculate the windows end in range between (lateElement.timestamp > > > - leftDelta, lateElement.timestamp] and those windows begin in range > > > between [lateElement.timestamp, lateElement.timestamp + rightDelta). > > > //////////////////////////////////////////////////////////// > > > ////////////////////////// > > > Hi everyone, > > > Thanks for this great discussion, and glad to see more and more > > > people > > are > > > interested on stream SQL & tableAPI. > > > > > > IMO, the key problems for Over window design are the SQL semantics > > > and > > the > > > runtime design. I totally agree with Fabian that we should skip the > > design > > > of TumbleRows and SessionRows windows for now, as they are not well > > defined > > > in SQL semantics. > > > > > > Runtime design is the most crucial part we are interested in and > > > volunteered to contribute into. We have thousands of machines > > > running > > flink > > > streaming jobs. The costs in terms of CPU, memory, and state are the > > vital > > > factors that we have to taken into account. We have been working on > > > the design of OVER window in the past months, and planning to send > > > out a detailed design doc to DEV quite soon. But since Fabian > > > started a good discussion on OVER window, I would like to share our > > > ideas/thoughts about the runtime design for OVER window. > > > > > > 1. As SunJincheng pointed out earlier, sliding window does not > > > work > > for > > > unbounded preceding, we need alternative approach for unbound > > > over window. > > > 2. Though sliding window may work for some cases of bounded window, > > > it is not very efficient thereby should not be used for > > > production. To the > > > best of my understanding, the current runtime implementation of > > sliding > > > window has not leveraged the concepts of state Panes yet. This > > > means that > > > if we use sliding window for OVER window, there will be a > > > backend > > state > > > created per each group (partition by) and each row, and whenever a > new > > > record arrives, it will be accumulated to all the existing > > > windows > > that > > > has > > > not been closed. This would cause quite a lot of overhead in > > > terms of both > > > CPU and memory&state. > > > 3. Fabian has mentioned an approach of leveraging “ProcessFunction” > > and > > > a “sortedState”. I like this idea. The design details on this are > > > not quite > > > clear yet. So I would like to add more thoughts on this. Regardless > > > which dataStream API we are going to use (it is very likely that > > > we > > need > > > a new API), we should come out with an optimal approach. The > > > purpose > > of > > > grouping window and over window is to partition the data, such > > > that we can > > > generate the aggregate results. So when we talk about the design > > > of > > OVER > > > window, we have to think about the aggregates. As we proposed in > > > our recent > > > UDAGG doc https://goo.gl/6ntclB, the user defined accumulator > > > will > > be > > > stored in the aggregate state. Besides accumulator, we have also > > > introduced > > > a retract API for UDAGG. With aggregate accumulator and retract > > > API, I am > > > proposing a runtime approach to implement the OVER window as > > followings. > > > 4. > > > - We first implement a sorted state interface > > > - Per each group, we just create one sorted state. When a new > > record > > > arrives, it will insert into this sorted state, in the > > > meanwhile it will be > > > accumulated to the aggregate accumulator. > > > - For over window, we keep the aggregate accumulator for the > entire > > > job lifelong time. This is different than the case where we > > > delete the > > > accumulator for each group/window when a grouping-window is > > finished. > > > - When an over window is up to trigger, we grab the > > > previous accumulator from the state and accumulate values onto > > > it with all > > > the records till the upperBoundary of the current window, and > > > retract all > > > the out of scope records till its lowerBoundary. We emit the > > > aggregate result and save the accumulator for the next window. > > > > > > > > > Hello Fabian, > > > I would suggest we should first start working on runtime design of > > > over window and aggregate. Once we have a good design there, one can > > > easily > > add > > > the support for SQL as well as tableAPI. What do you think? > > > > > > Regards, > > > Shaoxuan > > > > > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > Hi Radu, > > > > > > > > thanks for your comments! > > > > > > > > Yes, my intention is to open new JIRA issues to structure the > > > > development process. Everybody is very welcome to pick up issues > > > > and discuss the design proposals. > > > > At the moment I see the following six issues to start with: > > > > > > > > - streaming SQL OVER ROW for processing time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > - streaming SQL OVER RANGE for processing time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > - streaming SQL OVER RANGE for event time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > For each of these windows we need corresponding translation rules > > > > and execution code. > > > > > > > > Subsequent JIRAs would be > > > > - extending the Table API for supported SQL windows > > > > - add support for FOLLOWING > > > > - etc. > > > > > > > > Regarding the requirement for a sorted state. I am not sure if the > > > > OVER windows should be implemented using Flink's DataStream window > > > framework. > > > > We need a good design document to figure out what is the best > > > > approach. A ProcessFunction with a sorted state might be a good > > solution > > > as well. > > > > > > > > Best, Fabian > > > > > > > > > > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > > > Hi all, > > > > > > > > > > Thanks for starting these discussion - it is very useful. > > > > > It does make sense indeed to refactor all these and coordinate a > > > > > bit the efforts not to have overlapping implementations and > > > > > incompatible > > > > solutions. > > > > > > > > > > If you close the 3 jira issues you mentioned - do you plan to > > > > > redesign them and open new ones? Do you need help from our side > > > > > - we can also pick the redesign of some of these new jira > > > > > issues. For example we already > > > > have > > > > > an implementation for this and we can help with the design. > > > > > Nevertheless, let's coordinate the effort. > > > > > > > > > > Regarding the support for the different types of window - I > > > > > think the > > > > best > > > > > option is to split the implementation in small units. We can > > > > > easily do > > > > this > > > > > from the transformation rule class and with this each particular > > > > > type of window (session/sliding/sliderows/processing time/...) > > > > > will have a clear implementation and a corresponding > > > > > architecture within > > > the jira issue? > > > > What > > > > > do you think about such a granularity? > > > > > > > > > > Regarding the issue of " Q4: The implementaion of SlideRows > > > > > still need a custom operator that collects records in a priority > > > > > queue ordered by the "rowtime", which is similar to the design > > > > > we discussed in FLINK-4697, right? " > > > > > Why would you need this operator? The window buffer can act to > > > > > some > > > > extent > > > > > as a priority queue as long as the trigger and evictor is set to > > > > > work > > > > based > > > > > on the rowtime - or maybe I am missing something... Can you > > > > > please > > > > clarify > > > > > this. > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > Mobile: +49 15209084330 > > > > > Telephone: +49 891588344173 > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 > > > > > Düsseldorf, Germany, www.huawei.com Registered Office: > > > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing > > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der > > > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail > > > > > and its attachments contain confidential information from > > > > > HUAWEI, which is intended only for the person or entity whose > > > > > address > > > is > > > > > listed above. Any use of the information contained herein in any > > > > > way (including, but not limited to, total or partial disclosure, > > > > reproduction, > > > > > or dissemination) by persons other than the intended > > > > > recipient(s) is prohibited. If you receive this e-mail in error, > > > > > please notify the > > > sender > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > > -----Original Message----- > > > > > From: Jark Wu [mailto:wuchong...@alibaba-inc.com] > > > > > Sent: Tuesday, January 24, 2017 6:53 AM > > > > > To: dev@flink.apache.org > > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row > > Windows > > > > for > > > > > streaming tables > > > > > > > > > > Hi Fabian, > > > > > > > > > > Thanks for bringing up this discussion and the nice approach to > > > > > avoid overlapping contributions. > > > > > > > > > > All of these make sense to me. But I have some questions. > > > > > > > > > > Q1: If I understand correctly, we will not support TumbleRows > > > > > and SessionRows at the beginning. But maybe support them as a > > > > > syntax > > sugar > > > > (in > > > > > Table API) when the SlideRows is supported in the future. Right ? > > > > > > > > > > Q2: How to support SessionRows based on SlideRows ? I don't get > > > > > how > > to > > > > > partition on "gap-separated". > > > > > > > > > > Q3: Should we break down the approach into smaller tasks for > > streaming > > > > > tables and batch tables ? > > > > > > > > > > Q4: The implementaion of SlideRows still need a custom operator > > > > > that collects records in a priority queue ordered by the > > > > > "rowtime", which > > is > > > > > similar to the design we discussed in FLINK-4697, right? > > > > > > > > > > +1 not support for OVER ROW for event time at this point. > > > > > > > > > > Regards, Jark > > > > > > > > > > > > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道: > > > > > > > > > > > > Hi, > > > > > > We are also interested in streaming sql and very willing to > > > participate > > > > > and contribute. > > > > > > > > > > > > We are now in progress and we will also contribute to calcite > > > > > > to > > push > > > > > forward the window and stream-join support. > > > > > > > > > > > > > > > > > > > > > > > > -------------- > > > > > > Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time: > > > 2017年1月24日 > > > > > > 5:55 > > > > > > Receiver: dev@flink.apache.org > > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row > > Windows > > > > > > for streaming tables > > > > > > > > > > > > Hi Haohui, > > > > > > > > > > > > our plan was in fact to piggy-back on Calcite and use the > > > > > > TUMBLE > > > > > function [1] once is it is available (CALCITE-1345 [2]). > > > > > > Unfortunately, this issue does not seem to be very active, so > > > > > > I > > don't > > > > > know what the progress is. > > > > > > > > > > > > I would suggest to move the discussion about group windows to > > > > > > a > > > > separate > > > > > thread and keep this one focused on the organization of the SQL > > > > > OVER windows. > > > > > > > > > > > > Best, > > > > > > Fabian > > > > > > > > > > > > [1] http://calcite.apache.org/docs/stream.html) > > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345 > > > > > > > > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>: > > > > > > > > > > > >> Hi Fabian, > > > > > >> > > > > > >> FLINK-4692 has added the support for tumbling window and we > > > > > >> are excited to try it out and expose it as a SQL construct. > > > > > >> > > > > > >> Just curious -- what's your thought on the SQL syntax on > > > > > >> tumbling > > > > > window? > > > > > >> > > > > > >> Implementation wise it might make sense to think tumbling > > > > > >> window > > as > > > a > > > > > >> special case of the sliding window. > > > > > >> > > > > > >> The problem I see is that the OVER construct might be > > > > > >> insufficient > > > to > > > > > >> support all the use cases of tumbling windows. For example, > > > > > >> it > > fails > > > > > >> to express tumbling windows that have fractional time units > > > > > >> (as pointed out in http://calcite.apache.org/docs/stream.html). > > > > > >> > > > > > >> It looks to me that the Calcite / Azure Stream Analytics have > > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to > > > > > >> address > > this > > > > > issue. > > > > > >> > > > > > >> Do you think it is a good idea to follow the same conventions? > > Your > > > > > >> ideas are appreciated. > > > > > >> > > > > > >> Regards, > > > > > >> Haohui > > > > > >> > > > > > >> > > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai > > > > > >> <ricet...@gmail.com> > > > > wrote: > > > > > >> > > > > > >>> +1 > > > > > >>> > > > > > >>> We are also quite interested in these features and would > > > > > >>> love to participate and contribute. > > > > > >>> > > > > > >>> ~Haohui > > > > > >>> > > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske > > > > > >>> <fhue...@gmail.com > > > > > > > > wrote: > > > > > >>> > > > > > >>>> Hi everybody, > > > > > >>>> > > > > > >>>> it seems that currently several contributors are working on > > > > > >>>> new features for the streaming Table API / SQL around row > > > > > >>>> windows > > (as > > > > > >>>> defined in > > > > > >>>> FLIP-11 > > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, > > > FLINK-4680, > > > > > >>>> FLINK-5584). > > > > > >>>> Since these efforts overlap quite a bit I spent some time > > thinking > > > > > >>>> about how we can approach these features and how to avoid > > > > > >>>> overlapping contributions. > > > > > >>>> > > > > > >>>> The challenge here is the following. Some of the Table API > > > > > >>>> row windows > > > > > >> as > > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while > > other > > > > > >>>> cannot be easily expressed as such (TumbleRows for > > > > > >>>> row-count intervals, SessionRows). > > > > > >>>> However, since Calcite already supports SQL OVER windows, > > > > > >>>> we can reuse > > > > > >> the > > > > > >>>> optimization logic for some of the Table API row windows. I > > > > > >>>> also thought about the semantics of the TumbleRows and > > > > > >>>> SessionRows windows as defined in > > > > > >>>> FLIP-11 and came to the conclusion that these are not well > > defined > > > > > >>>> in > > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows > > > > > >>>> with a special PARTITION BY clause. > > > > > >>>> > > > > > >>>> I propose to approach SQL OVER windows and Table API row > > > > > >>>> windows > > > as > > > > > >>>> follows: > > > > > >>>> > > > > > >>>> We start with three simple cases for SQL OVER windows (not > > > > > >>>> Table API > > > > > >> yet): > > > > > >>>> > > > > > >>>> * OVER RANGE for event time > > > > > >>>> * OVER RANGE for processing time > > > > > >>>> * OVER ROW for processing time > > > > > >>>> > > > > > >>>> All cases fulfill the following restrictions: > > > > > >>>> - All aggregations in SELECT must refer to the same window. > > > > > >>>> - PARTITION BY may not contain the rowtime attribute. > > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or > > > > > >>>> on a marker function that indicates processing time. > > > > > >>>> Additional sort attributes are not supported initially. > > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and > > "BETWEEN > > > x > > > > > >>>> PRECEDING AND CURRENT ROW" are supported. > > > > > >>>> > > > > > >>>> OVER ROW for event time cannot be easily supported. With > > > > > >>>> event time, we may have late records which need to be > > > > > >>>> injected into > > the > > > > > >>>> order of records. > > > > > >>>> When > > > > > >>>> a record in injected in to the order where a row-count > > > > > >>>> window > > has > > > > > >> already > > > > > >>>> been computed, this and all following windows will change. > > > > > >>>> We > > > could > > > > > >> either > > > > > >>>> drop the record or sent out many retraction records. I > > > > > >>>> think it > > is > > > > > >>>> best > > > > > >> to > > > > > >>>> not open this can of worms at this point. > > > > > >>>> > > > > > >>>> The rational for all of the above restrictions is to have > > > > > >>>> first versions of OVER windows soon. > > > > > >>>> Once we have the above cases covered we can extend and > > > > > >>>> remove > > > > > >> limitations > > > > > >>>> as follows: > > > > > >>>> > > > > > >>>> - Table API SlideRow windows (with the same restrictions as > > > above). > > > > > >>>> This will be mostly API work since the execution part has > > > > > >>>> been > > > > solved > > > > > before. > > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING) > > > > > >>>> - Add support for different windows in SELECT. All windows > > > > > >>>> must > > be > > > > > >>>> partitioned and ordered in the same way. > > > > > >>>> - Add support for additional ORDER BY attributes (besides > time). > > > > > >>>> > > > > > >>>> As I said before, TumbleRows and SessionRows windows as in > > FLIP-11 > > > > > >>>> are > > > > > >> not > > > > > >>>> well defined, IMO. > > > > > >>>> They can be expressed as SlideRows windows with special > > > > > >>>> partitioning (partitioning on fixed, non-overlapping time > > > > > >>>> ranges for TumbleRows, and gap-separated, non-overlapping > > > > > >>>> time ranges > > for > > > > > >>>> SessionRows) I would not start to work on those yet. > > > > > >>>> > > > > > >>>> I would like to close all related JIRA issues (FLINK-4678, > > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the > > > development > > > > > >>>> of these > > > > > >> features > > > > > >>>> as outlined above with corresponding JIRA issues. > > > > > >>>> > > > > > >>>> What do others think? (I cc'ed the contributors assigned to > > > > > >>>> the above > > > > > >> JIRA > > > > > >>>> issues) > > > > > >>>> > > > > > >>>> Best, Fabian > > > > > >>>> > > > > > >>>> [1] > > > > > >>>> > > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > >> 11%3A+Table+API+Stream+Aggregations > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > >