Hi,Fabian

Thank you for your reply. Yes, regarding the point you have mentioned “to
improve the efficiency of the implementation”, I agree with you. I did not
clearly explain my solution in my last email. I intended to say the
existing window mechanism can be used for bounded preceding, but will not
work for unbounded preceding (alternatively we can use processFunction).
See the details of my thoughts on the solutions of OVER window as below:

   -[The first solution]:

    BOUNDED PRECEDING uses the existing window mechanism, while UNBOUNDED
PRECEDING uses ProcessFunction (however, ProcessFunction currently only
supports for KeyedStream, thereby only working for PartitionBy. We need
extend it to DataStream API). The pros and cons for this solution are:

    Pros:  it does not need a sorted state, therefore we can quickly
implement.

    Cons: performance is a concern here, as the same record has to be
included and copied to many windows. ( this is the same as using sliding
window).

-[The second solution]:

   For both BOUNDED and UNBOUNDED, we use ProcessFunction + Sorted State.
The pros and cons for this solution are:

   Pros: we can get good performance

   Cons: We need a sorted state support , and additional efforts to manage
the state, split the logic window, trigger the calculation. And we can not
leverage the current optimization of the window mechanism.

    I noticed recently, to better support aggregate, Stephan have added
some special optimizations on the window State and windowedStream API, such
as FLINK-5590(Create a proper internal state hierarchy) and  FLINK-5582(Add
a general distributive aggregate function). Similarly, to achieve the best
performance, can we add some special APIs for "dataStream / KeyedStream"
which is dedicated for OVER window?  Just my 2 cents.

Best,

SunJincheng

2017-01-24 22:25 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:

> Hi SunJincheng,
>
> thanks a lot for your comments!
>
> regarding the suitability of DataStream sliding windows: You are right
> that UNBOUNDED PRECEDING windows cannot be implemented as DataStream
> sliding windows. The same is true for OVER RANGE windows.
> I think the only OVER windows that could be done with DataStream sliding
> windows are bounded OVER ROW windows (processing time). For the other
> window types, I was thinking about implementing them using a
> ProcessFunction. We might need additional support for sorted state to
> improve the efficiency of the implementation.
>
> The motivation to restrict the first version to not support FOLLOWING was
> to keep the implementation more simple. I am not convinced that the best
> solution to implement OVER windows is to use the DataStream window
> framework (window assigner, trigger, evictor). Using a more flexible
> framework (at the cost of additional implementation overhead) might pay off
> when we want to add more features.
>
> You are right, we probably need better support for sorted state. I think
> we will need this as well, when implementing the OVER RANGE windows which
> cannot be easily implemented in the DataStream window framework.
> A thorough design document is required here.
>
> Best,
> Fabian
>
>
> 2017-01-24 7:51 GMT+01:00 jincheng sun <sunjincheng...@gmail.com>:
>
>> Hello Fabian,
>> Your plan looks good, I totally agree with your points.
>> While I am working on FLINK-4680, I had the similar concerns about the
>> semantics of TumbleRows and SessionRows. It is much clear if we define
>> these windows as SlideRows with PARTITION BY clause.
>> Regarding to the implementation plan of Table API row windows, I would
>> also like to share my ideas/thoughts on OVER window obtained while I am
>> developing FLINK-4680:
>>
>> - Table API SlideRow windows (with the same restrictions as above). This
>> will be mostly API work since the execution part has been solved before.
>> Though the sliding window can work for the bounded preceding, but it is
>> not sufficient to support unbounded preceding. For instance, we may
>> potentially use SlidingProcessingTimeWindows and ProcessingTimeTrigger to
>> implement “OVER RANGE for processing time”, but we still need to provide a
>> certain fixed window size, which is not proper for unbounded processing.
>> Same problems exist for ”OVER RANGE for event time“  and “OVER ROW for
>> processing time”. Therefore, we need a new window assigner and trigger for
>> unbounded preceding, say SlideRowGlobalWindows and
>> SlideRowGlobalWindowXXXTrigger. What do you think?
>>
>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>> If I understand you correctly, you want to implement the SlideRow windows
>> first without the support of FOLLOWING(I guess you want to leverage the
>> existing SlidingProcessing(Event)TimeWindows and
>> Processing(Event)TimeTrigger?). IMO, when we implement SlideRow windows,
>> we could just provide a new WindowAssigner and trigger, which can support
>> both bounded preceding and following semantics (current row is just a
>> special case of FOLLOWING where the following row is equal to 0). What do
>> you think?
>>
>> - Add support for additional ORDER BY attributes (besides time).
>> This is an important and a necessary part for OVER. But to achieve this,
>> we probably need a sorted state backend, maybe sortedMapstate? Is it also
>> included in your plan.
>>
>> Best,
>> SunJincheng
>>
>> 2017-01-23 23:30 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi everybody,
>>>
>>> it seems that currently several contributors are working on new features
>>> for the streaming Table API / SQL around row windows (as defined in FLIP-11
>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>>> FLINK-5584).
>>> Since these efforts overlap quite a bit I spent some time thinking about
>>> how we can approach these features and how to avoid overlapping
>>> contributions.
>>>
>>> The challenge here is the following. Some of the Table API row windows
>>> as defined by FLIP-11 [1] are basically SQL OVER windows while other cannot
>>> be easily expressed as such (TumbleRows for row-count intervals,
>>> SessionRows).
>>> However, since Calcite already supports SQL OVER windows, we can reuse
>>> the optimization logic for some of the Table API row windows. I also
>>> thought about the semantics of the TumbleRows and SessionRows windows as
>>> defined in FLIP-11 and came to the conclusion that these are not well
>>> defined in FLIP-11 and should rather be defined as SlideRows windows with a
>>> special PARTITION BY clause.
>>>
>>> I propose to approach SQL OVER windows and Table API row windows as
>>> follows:
>>>
>>> We start with three simple cases for SQL OVER windows (not Table API
>>> yet):
>>>
>>> * OVER RANGE for event time
>>> * OVER RANGE for processing time
>>> * OVER ROW for processing time
>>>
>>> All cases fulfill the following restrictions:
>>> - All aggregations in SELECT must refer to the same window.
>>> - PARTITION BY may not contain the rowtime attribute.
>>> - ORDER BY must be on rowtime attribute (for event time) or on a marker
>>> function that indicates processing time. Additional sort attributes are not
>>> supported initially.
>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>>> PRECEDING AND CURRENT ROW" are supported.
>>>
>>> OVER ROW for event time cannot be easily supported. With event time, we
>>> may have late records which need to be injected into the order of records.
>>> When a record in injected in to the order where a row-count window has
>>> already been computed, this and all following windows will change. We could
>>> either drop the record or sent out many retraction records. I think it is
>>> best to not open this can of worms at this point.
>>>
>>> The rational for all of the above restrictions is to have first versions
>>> of OVER windows soon.
>>> Once we have the above cases covered we can extend and remove
>>> limitations as follows:
>>>
>>> - Table API SlideRow windows (with the same restrictions as above). This
>>> will be mostly API work since the execution part has been solved before.
>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>> - Add support for different windows in SELECT. All windows must be
>>> partitioned and ordered in the same way.
>>> - Add support for additional ORDER BY attributes (besides time).
>>>
>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are
>>> not well defined, IMO.
>>> They can be expressed as SlideRows windows with special partitioning
>>> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
>>> gap-separated, non-overlapping time ranges for SessionRows)
>>> I would not start to work on those yet.
>>>
>>> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
>>> FLINK-4680, FLINK-5584) and restructure the development of these features
>>> as outlined above with corresponding JIRA issues.
>>>
>>> What do others think? (I cc'ed the contributors assigned to the above
>>> JIRA issues)
>>>
>>> Best, Fabian
>>>
>>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A
>>> +Table+API+Stream+Aggregations
>>>
>>>
>>>
>>
>

Reply via email to