Hi all,

I also think it's a good idea that we need to agree on the API level first.

I am sorry, we did not give some usage examples of the API in the FLIP
documentation before. This may have caused some misunderstandings about the
discussion of this mail thread.

So, now I have added some usage examples in the "Public Interfaces" section
of the FLIP-44 documentation.

Let us first know the API through its use examples.

Any feedback and questions please let me know.

Best,
Vino

vino yang <yanghua1...@gmail.com> 于2019年6月27日周四 下午12:51写道:

> Hi Jark,
>
> `DataStream.localKeyBy().process()` has some key difference with
> `DataStream.process()`. The former API receive `KeyedProcessFunction`
> (sorry my previous reply may let you misunderstood), the latter receive API
> receive `ProcessFunction`. When you read the java doc of ProcessFunction,
> you can find a "*Note*" statement:
>
> Access to keyed state and timers (which are also scoped to a key) is only
>> available if the ProcessFunction is applied on a KeyedStream.
>
>
> In addition, you can also compare the two
> implementations(`ProcessOperator` and `KeyedProcessOperator`) of them to
> view the difference.
>
> IMO, the "Note" statement means a lot for many use scenarios. For example,
> if we cannot access keyed state, we can only use heap memory to buffer data
> while it does not guarantee the semantics of correctness! And the timer is
> also very important in some scenarios.
>
> That's why we say our API is flexible, it can get most benefits (even
> subsequent potential benefits in the future) from KeyedStream.
>
> I have added some instructions on the use of localKeyBy in the FLIP-44
> documentation.
>
> Best,
> Vino
>
>
> Jark Wu <imj...@gmail.com> 于2019年6月27日周四 上午10:44写道:
>
>> Hi Piotr,
>>
>> I think the state migration you raised is a good point. Having
>> "stream.enableLocalAggregation(Trigger)” might add some implicit operators
>> which users can't set uid and cause the state compatibility/evolution
>> problems.
>> So let's put this in rejected alternatives.
>>
>> Hi Vino,
>>
>> You mentioned several times that "DataStream.localKeyBy().process()" can
>> solve the data skew problem of "DataStream.keyBy().process()".
>> I'm curious about what's the differences between "DataStream.process()"
>> and "DataStream.localKeyBy().process()"?
>> Can't "DataStream.process()" solve the data skew problem?
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 26 Jun 2019 at 18:20, Piotr Nowojski <pi...@ververica.com> wrote:
>>
>>> Hi Jark and Vino,
>>>
>>> I agree fully with Jark, that in order to have the discussion focused
>>> and to limit the number of parallel topics, we should first focus on one
>>> topic. We can first decide on the API and later we can discuss the runtime
>>> details. At least as long as we keep the potential requirements of the
>>> runtime part in mind while designing the API.
>>>
>>> Regarding the automatic optimisation and proposed by Jark:
>>>
>>> "stream.enableLocalAggregation(Trigger)”
>>>
>>> I would be against that in the DataStream API for the reasons that Vino
>>> presented. There was a discussion thread about future directions of Table
>>> API vs DataStream API and the consensus was that the automatic
>>> optimisations are one of the dividing lines between those two, for at least
>>> a couple of reasons. Flexibility and full control over the program was one
>>> of them. Another is state migration. Having
>>> "stream.enableLocalAggregation(Trigger)” that might add some implicit
>>> operators in the job graph can cause problems with savepoint/checkpoint
>>> compatibility.
>>>
>>> However I haven’t thought about/looked into the details of the Vino’s
>>> API proposal, so I can not fully judge it.
>>>
>>> Piotrek
>>>
>>> > On 26 Jun 2019, at 09:17, vino yang <yanghua1...@gmail.com> wrote:
>>> >
>>> > Hi Jark,
>>> >
>>> > Similar questions and responses have been repeated many times.
>>> >
>>> > Why didn't we spend more sections discussing the API?
>>> >
>>> > Because we try to reuse the ability of KeyedStream. The localKeyBy API
>>> just returns the KeyedStream, that's our design, we can get all the benefit
>>> from the KeyedStream and get further benefit from WindowedStream. The APIs
>>> come from KeyedStream and WindowedStream is long-tested and flexible. Yes,
>>> we spend much space discussing the local keyed state, that's not the goal
>>> and motivation, that's the way to implement local aggregation. It is much
>>> more complicated than the API we introduced, so we spent more section. Of
>>> course, this is the implementation level of the Operator. We also agreed to
>>> support the implementation of buffer+flush and added related instructions
>>> to the documentation. This needs to wait for the community to recognize,
>>> and if the community agrees, we will give more instructions. What's more, I
>>> have indicated before that we welcome state-related commenters to
>>> participate in the discussion, but it is not wise to modify the FLIP title.
>>> >
>>> > About the API of local aggregation:
>>> >
>>> > I don't object to ease of use is very important. But IMHO flexibility
>>> is the most important at the DataStream API level. Otherwise, what does
>>> DataStream mean? The significance of the DataStream API is that it is more
>>> flexible than Table/SQL, if it cannot provide this point then everyone
>>> would just use Table/SQL.
>>> >
>>> > The DataStream API should focus more on flexibility than on automatic
>>> optimization, which allows users to have more possibilities to implement
>>> complex programs and meet specific scenarios. There are a lot of programs
>>> written using the DataStream API that are far more complex than we think.
>>> It is very difficult to optimize at the API level and the benefit is very
>>> low.
>>> >
>>> > I want to say that we support a more generalized local aggregation. I
>>> mentioned in the previous reply that not only the UDF that implements
>>> AggregateFunction is called aggregation. In some complex scenarios, we have
>>> to support local aggregation through ProcessFunction and
>>> ProcessWindowFunction to solve the data skew problem. How do you support
>>> them in the API implementation and optimization you mentioned?
>>> >
>>> > Flexible APIs are arbitrarily combined to result in erroneous
>>> semantics, which does not prove that flexibility is meaningless because the
>>> user is the decision maker. I have been exemplified many times, for many
>>> APIs in DataStream, if we arbitrarily combined them, they also do not have
>>> much practical significance. So, users who use flexible APIs need to
>>> understand what they are doing and what is the right choice.
>>> >
>>> > I think that if we discuss this, there will be no result.
>>> >
>>> > @Stephan Ewen <mailto:se...@apache.org> , @Aljoscha Krettek <mailto:
>>> aljos...@apache.org> and @Piotr Nowojski <mailto:pi...@ververica.com>
>>> Do you have further comments?
>>> >
>>> >
>>> > Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 于2019年6月26日周三
>>> 上午11:46写道:
>>> > Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others,
>>> >
>>> > It seems that we still have some different ideas about the API
>>> > (localKeyBy()?) and implementation details (reuse window operator?
>>> local
>>> > keyed state?).
>>> > And the discussion is stalled and mixed with motivation and API and
>>> > implementation discussion.
>>> >
>>> > In order to make some progress in this topic, I want to summarize the
>>> > points (pls correct me if I'm wrong or missing sth) and would suggest
>>> to
>>> > split
>>> >  the topic into following aspects and discuss them one by one.
>>> >
>>> > 1) What's the main purpose of this FLIP?
>>> >  - From the title of this FLIP, it is to support local aggregate.
>>> However
>>> > from the content of the FLIP, 80% are introducing a new state called
>>> local
>>> > keyed state.
>>> >  - If we mainly want to introduce local keyed state, then we should
>>> > re-title the FLIP and involve in more people who works on state.
>>> >  - If we mainly want to support local aggregate, then we can jump to
>>> step 2
>>> > to discuss the API design.
>>> >
>>> > 2) What does the API look like?
>>> >  - Vino proposed to use "localKeyBy()" to do local process, the output
>>> of
>>> > local process is the result type of aggregate function.
>>> >   a) For non-windowed aggregate:
>>> > input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2)    **NOT
>>> > SUPPORT**
>>> >   b) For windowed aggregate:
>>> >
>>> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)
>>> >
>>> > 3) What's the implementation detail?
>>> >  - may reuse window operator or not.
>>> >  - may introduce a new state concepts or not.
>>> >  - may not have state in local operator by flushing buffers in
>>> > prepareSnapshotPreBarrier
>>> >  - and so on...
>>> >  - we can discuss these later when we reach a consensus on API
>>> >
>>> > --------------------
>>> >
>>> > Here are my thoughts:
>>> >
>>> > 1) Purpose of this FLIP
>>> >  - From the motivation section in the FLIP, I think the purpose is to
>>> > support local aggregation to solve the data skew issue.
>>> >    Then I think we should focus on how to provide a easy to use and
>>> clear
>>> > API to support **local aggregation**.
>>> >  - Vino's point is centered around the local keyed state API (or
>>> > localKeyBy()), and how to leverage the local keyed state API to support
>>> > local aggregation.
>>> >    But I'm afraid it's not a good way to design API for local
>>> aggregation.
>>> >
>>> > 2) local aggregation API
>>> >  - IMO, the method call chain
>>> >
>>> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)"
>>> > is not such easy to use.
>>> >    Because we have to provide two implementation for an aggregation
>>> (one
>>> > for partial agg, another for final agg). And we have to take care of
>>> >    the first window call, an inappropriate window call will break the
>>> > sematics.
>>> >  - From my point of view, local aggregation is a mature concept which
>>> > should output the intermediate accumulator (ACC) in the past period of
>>> time
>>> > (a trigger).
>>> >    And the downstream final aggregation will merge ACCs received from
>>> local
>>> > side, and output the current final result.
>>> >  - The current "AggregateFunction" API in DataStream already has the
>>> > accumulator type and "merge" method. So the only thing user need to do
>>> is
>>> > how to enable
>>> >    local aggregation opimization and set a trigger.
>>> >  - One idea comes to my head is that, assume we have a windowed
>>> aggregation
>>> > stream: "val stream = input.keyBy().window(w).aggregate(agg)". We can
>>> > provide an API on the stream.
>>> >    For exmaple, "stream.enableLocalAggregation(Trigger)", the trigger
>>> can
>>> > be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))". Then it
>>> will
>>> > be optmized into
>>> >    local operator + final operator, and local operator will combine
>>> records
>>> > every minute on event time.
>>> >  - In this way, there is only one line added, and the output is the
>>> same
>>> > with before, because it is just an opimization.
>>> >
>>> >
>>> > Regards,
>>> > Jark
>>> >
>>> >
>>> >
>>> > On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com
>>> <mailto:yanghua1...@gmail.com>> wrote:
>>> >
>>> > > Hi Kurt,
>>> > >
>>> > > Answer your questions:
>>> > >
>>> > > a) Sorry, I just updated the Google doc, still have no time update
>>> the
>>> > > FLIP, will update FLIP as soon as possible.
>>> > > About your description at this point, I have a question, what does
>>> it mean:
>>> > > how do we combine with
>>> > > `AggregateFunction`?
>>> > >
>>> > > I have shown you the examples which Flink has supported:
>>> > >
>>> > >    - input.localKeyBy(0).aggregate()
>>> > >    - input.localKeyBy(0).window().aggregate()
>>> > >
>>> > > You can show me a example about how do we combine with
>>> `AggregateFuncion`
>>> > > through your localAggregate API.
>>> > >
>>> > > About the example, how to do the local aggregation for AVG, consider
>>> this
>>> > > code:
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > *DataStream<Tuple2<String, Long>> source = null; source
>>> .localKeyBy(0)
>>> > > .timeWindow(Time.seconds(60)) .aggregate(agg1, new
>>> > > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>,
>>> String,
>>> > > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60))
>>> .aggregate(agg2,
>>> > > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String,
>>> > > TimeWindow>());*
>>> > >
>>> > > *agg1:*
>>> > > *signature : new AggregateFunction<Tuple2<String, Long>, Tuple2<Long,
>>> > > Long>, Tuple2<Long, Long>>() {}*
>>> > > *input param type: Tuple2<String, Long> f0: key, f1: value*
>>> > > *intermediate result type: Tuple2<Long, Long>, f0: local aggregated
>>> sum;
>>> > > f1: local aggregated count*
>>> > > *output param type:  Tuple2<Long, Long>, f0: local aggregated sum;
>>> f1:
>>> > > local aggregated count*
>>> > >
>>> > > *agg2:*
>>> > > *signature: new AggregateFunction<Tuple3<String, Long, Long>, Long,
>>> > > Tuple2<String, Long>>() {},*
>>> > > *input param type: Tuple3<String, Long, Long>, f0: key, f1:  local
>>> > > aggregated sum; f2: local aggregated count*
>>> > >
>>> > > *intermediate result type: Long  avg result*
>>> > > *output param type:  Tuple2<String, Long> f0: key, f1 avg result*
>>> > >
>>> > > For sliding window, we just need to change the window type if users
>>> want to
>>> > > do.
>>> > > Again, we try to give the design and implementation in the DataStream
>>> > > level. So I believe we can match all the requirements(It's just that
>>> the
>>> > > implementation may be different) comes from the SQL level.
>>> > >
>>> > > b) Yes, Theoretically, your thought is right. But in reality, it
>>> cannot
>>> > > bring many benefits.
>>> > > If we want to get the benefits from the window API, while we do not
>>> reuse
>>> > > the window operator? And just copy some many duplicated code to
>>> another
>>> > > operator?
>>> > >
>>> > > c) OK, I agree to let the state backend committers join this
>>> discussion.
>>> > >
>>> > > Best,
>>> > > Vino
>>> > >
>>> > >
>>> > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>> 于2019年6月24日周一 下午6:53写道:
>>> > >
>>> > > > Hi vino,
>>> > > >
>>> > > > One thing to add,  for a), I think use one or two examples like
>>> how to do
>>> > > > local aggregation on a sliding window,
>>> > > > and how do we do local aggregation on an unbounded aggregate, will
>>> do a
>>> > > lot
>>> > > > help.
>>> > > >
>>> > > > Best,
>>> > > > Kurt
>>> > > >
>>> > > >
>>> > > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com
>>> <mailto:ykt...@gmail.com>> wrote:
>>> > > >
>>> > > > > Hi vino,
>>> > > > >
>>> > > > > I think there are several things still need discussion.
>>> > > > >
>>> > > > > a) We all agree that we should first go with a unified
>>> abstraction, but
>>> > > > > the abstraction is not reflected by the FLIP.
>>> > > > > If your answer is "locakKeyBy" API, then I would ask how do we
>>> combine
>>> > > > > with `AggregateFunction`, and how do
>>> > > > > we do proper local aggregation for those have different
>>> intermediate
>>> > > > > result type, like AVG. Could you add these
>>> > > > > to the document?
>>> > > > >
>>> > > > > b) From implementation side, reusing window operator is one of
>>> the
>>> > > > > possible solutions, but not we base on window
>>> > > > > operator to have two different implementations. What I
>>> understanding
>>> > > is,
>>> > > > > one of the possible implementations should
>>> > > > > not touch window operator.
>>> > > > >
>>> > > > > c) 80% of your FLIP content is actually describing how do we
>>> support
>>> > > > local
>>> > > > > keyed state. I don't know if this is necessary
>>> > > > > to introduce at the first step and we should also involve
>>> committers
>>> > > work
>>> > > > > on state backend to share their thoughts.
>>> > > > >
>>> > > > > Best,
>>> > > > > Kurt
>>> > > > >
>>> > > > >
>>> > > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <yanghua1...@gmail.com
>>> <mailto:yanghua1...@gmail.com>>
>>> > > wrote:
>>> > > > >
>>> > > > >> Hi Kurt,
>>> > > > >>
>>> > > > >> You did not give more further different opinions, so I thought
>>> you
>>> > > have
>>> > > > >> agreed with the design after we promised to support two kinds of
>>> > > > >> implementation.
>>> > > > >>
>>> > > > >> In API level, we have answered your question about pass an
>>> > > > >> AggregateFunction to do the aggregation. No matter introduce
>>> > > localKeyBy
>>> > > > >> API
>>> > > > >> or not, we can support AggregateFunction.
>>> > > > >>
>>> > > > >> So what's your different opinion now? Can you share it with us?
>>> > > > >>
>>> > > > >> Best,
>>> > > > >> Vino
>>> > > > >>
>>> > > > >> Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>> 于2019年6月24日周一 下午4:24写道:
>>> > > > >>
>>> > > > >> > Hi vino,
>>> > > > >> >
>>> > > > >> > Sorry I don't see the consensus about reusing window operator
>>> and
>>> > > keep
>>> > > > >> the
>>> > > > >> > API design of localKeyBy. But I think we should definitely
>>> more
>>> > > > thoughts
>>> > > > >> > about this topic.
>>> > > > >> >
>>> > > > >> > I also try to loop in Stephan for this discussion.
>>> > > > >> >
>>> > > > >> > Best,
>>> > > > >> > Kurt
>>> > > > >> >
>>> > > > >> >
>>> > > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <
>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>>
>>> > > > >> wrote:
>>> > > > >> >
>>> > > > >> > > Hi all,
>>> > > > >> > >
>>> > > > >> > > I am happy we have a wonderful discussion and received many
>>> > > valuable
>>> > > > >> > > opinions in the last few days.
>>> > > > >> > >
>>> > > > >> > > Now, let me try to summarize what we have reached consensus
>>> about
>>> > > > the
>>> > > > >> > > changes in the design.
>>> > > > >> > >
>>> > > > >> > >    - provide a unified abstraction to support two kinds of
>>> > > > >> > implementation;
>>> > > > >> > >    - reuse WindowOperator and try to enhance it so that we
>>> can
>>> > > make
>>> > > > >> the
>>> > > > >> > >    intermediate result of the local aggregation can be
>>> buffered
>>> > > and
>>> > > > >> > > flushed to
>>> > > > >> > >    support two kinds of implementation;
>>> > > > >> > >    - keep the API design of localKeyBy, but declare the
>>> disabled
>>> > > > some
>>> > > > >> > APIs
>>> > > > >> > >    we cannot support currently, and provide a configurable
>>> API for
>>> > > > >> users
>>> > > > >> > to
>>> > > > >> > >    choose how to handle intermediate result;
>>> > > > >> > >
>>> > > > >> > > The above three points have been updated in the design doc.
>>> Any
>>> > > > >> > > questions, please let me know.
>>> > > > >> > >
>>> > > > >> > > @Aljoscha Krettek <aljos...@apache.org <mailto:
>>> aljos...@apache.org>> What do you think? Any
>>> > > > >> further
>>> > > > >> > > comments?
>>> > > > >> > >
>>> > > > >> > > Best,
>>> > > > >> > > Vino
>>> > > > >> > >
>>> > > > >> > > vino yang <yanghua1...@gmail.com <mailto:
>>> yanghua1...@gmail.com>> 于2019年6月20日周四 下午2:02写道:
>>> > > > >> > >
>>> > > > >> > > > Hi Kurt,
>>> > > > >> > > >
>>> > > > >> > > > Thanks for your comments.
>>> > > > >> > > >
>>> > > > >> > > > It seems we come to a consensus that we should alleviate
>>> the
>>> > > > >> > performance
>>> > > > >> > > > degraded by data skew with local aggregation. In this
>>> FLIP, our
>>> > > > key
>>> > > > >> > > > solution is to introduce local keyed partition to achieve
>>> this
>>> > > > goal.
>>> > > > >> > > >
>>> > > > >> > > > I also agree that we can benefit a lot from the usage of
>>> > > > >> > > > AggregateFunction. In combination with localKeyBy, We can
>>> easily
>>> > > > >> use it
>>> > > > >> > > to
>>> > > > >> > > > achieve local aggregation:
>>> > > > >> > > >
>>> > > > >> > > >    - input.localKeyBy(0).aggregate()
>>> > > > >> > > >    - input.localKeyBy(0).window().aggregate()
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > I think the only problem here is the choices between
>>> > > > >> > > >
>>> > > > >> > > >    - (1) Introducing a new primitive called localKeyBy and
>>> > > > implement
>>> > > > >> > > >    local aggregation with existing operators, or
>>> > > > >> > > >    - (2) Introducing an operator called localAggregation
>>> which
>>> > > is
>>> > > > >> > > >    composed of a key selector, a window-like operator,
>>> and an
>>> > > > >> aggregate
>>> > > > >> > > >    function.
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > There may exist some optimization opportunities by
>>> providing a
>>> > > > >> > composited
>>> > > > >> > > > interface for local aggregation. But at the same time, in
>>> my
>>> > > > >> opinion,
>>> > > > >> > we
>>> > > > >> > > > lose flexibility (Or we need certain efforts to achieve
>>> the same
>>> > > > >> > > > flexibility).
>>> > > > >> > > >
>>> > > > >> > > > As said in the previous mails, we have many use cases
>>> where the
>>> > > > >> > > > aggregation is very complicated and cannot be performed
>>> with
>>> > > > >> > > > AggregateFunction. For example, users may perform windowed
>>> > > > >> aggregations
>>> > > > >> > > > according to time, data values, or even external storage.
>>> > > > Typically,
>>> > > > >> > they
>>> > > > >> > > > now use KeyedProcessFunction or customized triggers to
>>> implement
>>> > > > >> these
>>> > > > >> > > > aggregations. It's not easy to address data skew in such
>>> cases
>>> > > > with
>>> > > > >> a
>>> > > > >> > > > composited interface for local aggregation.
>>> > > > >> > > >
>>> > > > >> > > > Given that Data Stream API is exactly targeted at these
>>> cases
>>> > > > where
>>> > > > >> the
>>> > > > >> > > > application logic is very complicated and optimization
>>> does not
>>> > > > >> > matter, I
>>> > > > >> > > > think it's a better choice to provide a relatively
>>> low-level and
>>> > > > >> > > canonical
>>> > > > >> > > > interface.
>>> > > > >> > > >
>>> > > > >> > > > The composited interface, on the other side, may be a good
>>> > > choice
>>> > > > in
>>> > > > >> > > > declarative interfaces, including SQL and Table API, as it
>>> > > allows
>>> > > > >> more
>>> > > > >> > > > optimization opportunities.
>>> > > > >> > > >
>>> > > > >> > > > Best,
>>> > > > >> > > > Vino
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>> 于2019年6月20日周四 上午10:15写道:
>>> > > > >> > > >
>>> > > > >> > > >> Hi all,
>>> > > > >> > > >>
>>> > > > >> > > >> As vino said in previous emails, I think we should first
>>> > > discuss
>>> > > > >> and
>>> > > > >> > > >> decide
>>> > > > >> > > >> what kind of use cases this FLIP want to
>>> > > > >> > > >> resolve, and what the API should look like. From my
>>> side, I
>>> > > think
>>> > > > >> this
>>> > > > >> > > is
>>> > > > >> > > >> probably the root cause of current divergence.
>>> > > > >> > > >>
>>> > > > >> > > >> My understand is (from the FLIP title and motivation
>>> section of
>>> > > > the
>>> > > > >> > > >> document), we want to have a proper support of
>>> > > > >> > > >> local aggregation, or pre aggregation. This is not a
>>> very new
>>> > > > idea,
>>> > > > >> > most
>>> > > > >> > > >> SQL engine already did this improvement. And
>>> > > > >> > > >> the core concept about this is, there should be an
>>> > > > >> AggregateFunction,
>>> > > > >> > no
>>> > > > >> > > >> matter it's a Flink runtime's AggregateFunction or
>>> > > > >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation have
>>> > > concept
>>> > > > >> of
>>> > > > >> > > >> intermediate data type, sometimes we call it ACC.
>>> > > > >> > > >> I quickly went through the POC piotr did before [1], it
>>> also
>>> > > > >> directly
>>> > > > >> > > uses
>>> > > > >> > > >> AggregateFunction.
>>> > > > >> > > >>
>>> > > > >> > > >> But the thing is, after reading the design of this FLIP,
>>> I
>>> > > can't
>>> > > > >> help
>>> > > > >> > > >> myself feeling that this FLIP is not targeting to have a
>>> proper
>>> > > > >> > > >> local aggregation support. It actually want to introduce
>>> > > another
>>> > > > >> > > concept:
>>> > > > >> > > >> LocalKeyBy, and how to split and merge local key groups,
>>> > > > >> > > >> and how to properly support state on local key. Local
>>> > > aggregation
>>> > > > >> just
>>> > > > >> > > >> happened to be one possible use case of LocalKeyBy.
>>> > > > >> > > >> But it lacks supporting the essential concept of local
>>> > > > aggregation,
>>> > > > >> > > which
>>> > > > >> > > >> is intermediate data type. Without this, I really don't
>>> thing
>>> > > > >> > > >> it is a good fit of local aggregation.
>>> > > > >> > > >>
>>> > > > >> > > >> Here I want to make sure of the scope or the goal about
>>> this
>>> > > > FLIP,
>>> > > > >> do
>>> > > > >> > we
>>> > > > >> > > >> want to have a proper local aggregation engine, or we
>>> > > > >> > > >> just want to introduce a new concept called LocalKeyBy?
>>> > > > >> > > >>
>>> > > > >> > > >> [1]: https://github.com/apache/flink/pull/4626 <
>>> https://github.com/apache/flink/pull/4626>
>>> > > > >> > > >>
>>> > > > >> > > >> Best,
>>> > > > >> > > >> Kurt
>>> > > > >> > > >>
>>> > > > >> > > >>
>>> > > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <
>>> > > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>> > > > >
>>> > > > >> > > wrote:
>>> > > > >> > > >>
>>> > > > >> > > >> > Hi Hequn,
>>> > > > >> > > >> >
>>> > > > >> > > >> > Thanks for your comments!
>>> > > > >> > > >> >
>>> > > > >> > > >> > I agree that allowing local aggregation reusing window
>>> API
>>> > > and
>>> > > > >> > > refining
>>> > > > >> > > >> > window operator to make it match both requirements
>>> (come from
>>> > > > our
>>> > > > >> > and
>>> > > > >> > > >> Kurt)
>>> > > > >> > > >> > is a good decision!
>>> > > > >> > > >> >
>>> > > > >> > > >> > Concerning your questions:
>>> > > > >> > > >> >
>>> > > > >> > > >> > 1. The result of
>>> input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>> > > may
>>> > > > >> be
>>> > > > >> > > >> > meaningless.
>>> > > > >> > > >> >
>>> > > > >> > > >> > Yes, it does not make sense in most cases. However, I
>>> also
>>> > > want
>>> > > > >> to
>>> > > > >> > > note
>>> > > > >> > > >> > users should know the right semantics of localKeyBy
>>> and use
>>> > > it
>>> > > > >> > > >> correctly.
>>> > > > >> > > >> > Because this issue also exists for the global keyBy,
>>> consider
>>> > > > >> this
>>> > > > >> > > >> example:
>>> > > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is
>>> also
>>> > > > >> > meaningless.
>>> > > > >> > > >> >
>>> > > > >> > > >> > 2. About the semantics of
>>> > > > >> > > >> >
>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
>>> > > > >> > > >> >
>>> > > > >> > > >> > Good catch! I agree with you that it's not good to
>>> enable all
>>> > > > >> > > >> > functionalities for localKeyBy from KeyedStream.
>>> > > > >> > > >> > Currently, We do not support some APIs such as
>>> > > > >> > > >> > connect/join/intervalJoin/coGroup. This is due to that
>>> we
>>> > > force
>>> > > > >> the
>>> > > > >> > > >> > operators on LocalKeyedStreams chained with the inputs.
>>> > > > >> > > >> >
>>> > > > >> > > >> > Best,
>>> > > > >> > > >> > Vino
>>> > > > >> > > >> >
>>> > > > >> > > >> >
>>> > > > >> > > >> > Hequn Cheng <chenghe...@gmail.com <mailto:
>>> chenghe...@gmail.com>> 于2019年6月19日周三 下午3:42写道:
>>> > > > >> > > >> >
>>> > > > >> > > >> > > Hi,
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > Thanks a lot for your great discussion and great to
>>> see
>>> > > that
>>> > > > >> some
>>> > > > >> > > >> > agreement
>>> > > > >> > > >> > > has been reached on the "local aggregate engine"!
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > ===> Considering the abstract engine,
>>> > > > >> > > >> > > I'm thinking is it valuable for us to extend the
>>> current
>>> > > > >> window to
>>> > > > >> > > >> meet
>>> > > > >> > > >> > > both demands raised by Kurt and Vino? There are some
>>> > > benefits
>>> > > > >> we
>>> > > > >> > can
>>> > > > >> > > >> get:
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > 1. The interfaces of the window are complete and
>>> clear.
>>> > > With
>>> > > > >> > > windows,
>>> > > > >> > > >> we
>>> > > > >> > > >> > > can define a lot of ways to split the data and
>>> perform
>>> > > > >> different
>>> > > > >> > > >> > > computations.
>>> > > > >> > > >> > > 2. We can also leverage the window to do miniBatch
>>> for the
>>> > > > >> global
>>> > > > >> > > >> > > aggregation, i.e, we can use the window to bundle
>>> data
>>> > > belong
>>> > > > >> to
>>> > > > >> > the
>>> > > > >> > > >> same
>>> > > > >> > > >> > > key, for every bundle we only need to read and write
>>> once
>>> > > > >> state.
>>> > > > >> > > This
>>> > > > >> > > >> can
>>> > > > >> > > >> > > greatly reduce state IO and improve performance.
>>> > > > >> > > >> > > 3. A lot of other use cases can also benefit from the
>>> > > window
>>> > > > >> base
>>> > > > >> > on
>>> > > > >> > > >> > memory
>>> > > > >> > > >> > > or stateless.
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > ===> As for the API,
>>> > > > >> > > >> > > I think it is good to make our API more flexible.
>>> However,
>>> > > we
>>> > > > >> may
>>> > > > >> > > >> need to
>>> > > > >> > > >> > > make our API meaningful.
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > Take my previous reply as an example,
>>> > > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The
>>> result may
>>> > > be
>>> > > > >> > > >> > meaningless.
>>> > > > >> > > >> > > Another example I find is the intervalJoin, e.g.,
>>> > > > >> > > >> > >
>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In
>>> > > > >> this
>>> > > > >> > > >> case, it
>>> > > > >> > > >> > > will bring problems if input1 and input2 share
>>> different
>>> > > > >> > > parallelism.
>>> > > > >> > > >> We
>>> > > > >> > > >> > > don't know which input should the join chained with?
>>> Even
>>> > > if
>>> > > > >> they
>>> > > > >> > > >> share
>>> > > > >> > > >> > the
>>> > > > >> > > >> > > same parallelism, it's hard to tell what the join is
>>> doing.
>>> > > > >> There
>>> > > > >> > > are
>>> > > > >> > > >> > maybe
>>> > > > >> > > >> > > some other problems.
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > From this point of view, it's at least not good to
>>> enable
>>> > > all
>>> > > > >> > > >> > > functionalities for localKeyBy from KeyedStream?
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > Great to also have your opinions.
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > Best, Hequn
>>> > > > >> > > >> > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <
>>> > > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>> > > > >> > >
>>> > > > >> > > >> > wrote:
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > > Hi Kurt and Piotrek,
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > Thanks for your comments.
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > I agree that we can provide a better abstraction
>>> to be
>>> > > > >> > compatible
>>> > > > >> > > >> with
>>> > > > >> > > >> > > two
>>> > > > >> > > >> > > > different implementations.
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > First of all, I think we should consider what kind
>>> of
>>> > > > >> scenarios
>>> > > > >> > we
>>> > > > >> > > >> need
>>> > > > >> > > >> > > to
>>> > > > >> > > >> > > > support in *API* level?
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > We have some use cases which need to a customized
>>> > > > aggregation
>>> > > > >> > > >> through
>>> > > > >> > > >> > > > KeyedProcessFunction, (in the usage of our
>>> > > > localKeyBy.window
>>> > > > >> > they
>>> > > > >> > > >> can
>>> > > > >> > > >> > use
>>> > > > >> > > >> > > > ProcessWindowFunction).
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > Shall we support these flexible use scenarios?
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > Best,
>>> > > > >> > > >> > > > Vino
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com <mailto:
>>> ykt...@gmail.com>> 于2019年6月18日周二 下午8:37写道:
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > > Hi Piotr,
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > > Thanks for joining the discussion. Make “local
>>> > > > aggregation"
>>> > > > >> > > >> abstract
>>> > > > >> > > >> > > > enough
>>> > > > >> > > >> > > > > sounds good to me, we could
>>> > > > >> > > >> > > > > implement and verify alternative solutions for
>>> use
>>> > > cases
>>> > > > of
>>> > > > >> > > local
>>> > > > >> > > >> > > > > aggregation. Maybe we will find both solutions
>>> > > > >> > > >> > > > > are appropriate for different scenarios.
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > > Starting from a simple one sounds a practical
>>> way to
>>> > > go.
>>> > > > >> What
>>> > > > >> > do
>>> > > > >> > > >> you
>>> > > > >> > > >> > > > think,
>>> > > > >> > > >> > > > > vino?
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > > Best,
>>> > > > >> > > >> > > > > Kurt
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
>>> > > > >> > > >> pi...@ververica.com <mailto:pi...@ververica.com>>
>>> > > > >> > > >> > > > > wrote:
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > > > > Hi Kurt and Vino,
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > I think there is a trade of hat we need to
>>> consider
>>> > > for
>>> > > > >> the
>>> > > > >> > > >> local
>>> > > > >> > > >> > > > > > aggregation.
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > Generally speaking I would agree with Kurt
>>> about
>>> > > local
>>> > > > >> > > >> > > aggregation/pre
>>> > > > >> > > >> > > > > > aggregation not using Flink's state flush the
>>> > > operator
>>> > > > >> on a
>>> > > > >> > > >> > > checkpoint.
>>> > > > >> > > >> > > > > > Network IO is usually cheaper compared to
>>> Disks IO.
>>> > > > This
>>> > > > >> has
>>> > > > >> > > >> > however
>>> > > > >> > > >> > > > > couple
>>> > > > >> > > >> > > > > > of issues:
>>> > > > >> > > >> > > > > > 1. It can explode number of in-flight records
>>> during
>>> > > > >> > > checkpoint
>>> > > > >> > > >> > > barrier
>>> > > > >> > > >> > > > > > alignment, making checkpointing slower and
>>> decrease
>>> > > the
>>> > > > >> > actual
>>> > > > >> > > >> > > > > throughput.
>>> > > > >> > > >> > > > > > 2. This trades Disks IO on the local
>>> aggregation
>>> > > > machine
>>> > > > >> > with
>>> > > > >> > > >> CPU
>>> > > > >> > > >> > > (and
>>> > > > >> > > >> > > > > > Disks IO in case of RocksDB) on the final
>>> aggregation
>>> > > > >> > machine.
>>> > > > >> > > >> This
>>> > > > >> > > >> > > is
>>> > > > >> > > >> > > > > > fine, as long there is no huge data skew. If
>>> there is
>>> > > > >> only a
>>> > > > >> > > >> > handful
>>> > > > >> > > >> > > > (or
>>> > > > >> > > >> > > > > > even one single) hot keys, it might be better
>>> to keep
>>> > > > the
>>> > > > >> > > >> > persistent
>>> > > > >> > > >> > > > > state
>>> > > > >> > > >> > > > > > in the LocalAggregationOperator to offload
>>> final
>>> > > > >> aggregation
>>> > > > >> > > as
>>> > > > >> > > >> > much
>>> > > > >> > > >> > > as
>>> > > > >> > > >> > > > > > possible.
>>> > > > >> > > >> > > > > > 3. With frequent checkpointing local
>>> aggregation
>>> > > > >> > effectiveness
>>> > > > >> > > >> > would
>>> > > > >> > > >> > > > > > degrade.
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > I assume Kurt is correct, that in your use
>>> cases
>>> > > > >> stateless
>>> > > > >> > > >> operator
>>> > > > >> > > >> > > was
>>> > > > >> > > >> > > > > > behaving better, but I could easily see other
>>> use
>>> > > cases
>>> > > > >> as
>>> > > > >> > > well.
>>> > > > >> > > >> > For
>>> > > > >> > > >> > > > > > example someone is already using RocksDB, and
>>> his job
>>> > > > is
>>> > > > >> > > >> > bottlenecked
>>> > > > >> > > >> > > > on
>>> > > > >> > > >> > > > > a
>>> > > > >> > > >> > > > > > single window operator instance because of the
>>> data
>>> > > > >> skew. In
>>> > > > >> > > >> that
>>> > > > >> > > >> > > case
>>> > > > >> > > >> > > > > > stateful local aggregation would be probably a
>>> better
>>> > > > >> > choice.
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > Because of that, I think we should eventually
>>> provide
>>> > > > >> both
>>> > > > >> > > >> versions
>>> > > > >> > > >> > > and
>>> > > > >> > > >> > > > > in
>>> > > > >> > > >> > > > > > the initial version we should at least make the
>>> > > “local
>>> > > > >> > > >> aggregation
>>> > > > >> > > >> > > > > engine”
>>> > > > >> > > >> > > > > > abstract enough, that one could easily provide
>>> > > > different
>>> > > > >> > > >> > > implementation
>>> > > > >> > > >> > > > > > strategy.
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > Piotrek
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <
>>> > > > ykt...@gmail.com <mailto:ykt...@gmail.com>
>>> > > > >> >
>>> > > > >> > > >> wrote:
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > Hi,
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > For the trigger, it depends on what operator
>>> we
>>> > > want
>>> > > > to
>>> > > > >> > use
>>> > > > >> > > >> under
>>> > > > >> > > >> > > the
>>> > > > >> > > >> > > > > > API.
>>> > > > >> > > >> > > > > > > If we choose to use window operator,
>>> > > > >> > > >> > > > > > > we should also use window's trigger.
>>> However, I
>>> > > also
>>> > > > >> think
>>> > > > >> > > >> reuse
>>> > > > >> > > >> > > > window
>>> > > > >> > > >> > > > > > > operator for this scenario may not be
>>> > > > >> > > >> > > > > > > the best choice. The reasons are the
>>> following:
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > 1. As a lot of people already pointed out,
>>> window
>>> > > > >> relies
>>> > > > >> > > >> heavily
>>> > > > >> > > >> > on
>>> > > > >> > > >> > > > > state
>>> > > > >> > > >> > > > > > > and it will definitely effect performance.
>>> You can
>>> > > > >> > > >> > > > > > > argue that one can use heap based
>>> statebackend, but
>>> > > > >> this
>>> > > > >> > > will
>>> > > > >> > > >> > > > introduce
>>> > > > >> > > >> > > > > > > extra coupling. Especially we have a chance
>>> to
>>> > > > >> > > >> > > > > > > design a pure stateless operator.
>>> > > > >> > > >> > > > > > > 2. The window operator is *the most*
>>> complicated
>>> > > > >> operator
>>> > > > >> > > >> Flink
>>> > > > >> > > >> > > > > currently
>>> > > > >> > > >> > > > > > > have. Maybe we only need to pick a subset of
>>> > > > >> > > >> > > > > > > window operator to achieve the goal, but
>>> once the
>>> > > > user
>>> > > > >> > wants
>>> > > > >> > > >> to
>>> > > > >> > > >> > > have
>>> > > > >> > > >> > > > a
>>> > > > >> > > >> > > > > > deep
>>> > > > >> > > >> > > > > > > look at the localAggregation operator, it's
>>> still
>>> > > > >> > > >> > > > > > > hard to find out what's going on under the
>>> window
>>> > > > >> > operator.
>>> > > > >> > > >> For
>>> > > > >> > > >> > > > > > simplicity,
>>> > > > >> > > >> > > > > > > I would also recommend we introduce a
>>> dedicated
>>> > > > >> > > >> > > > > > > lightweight operator, which also much easier
>>> for a
>>> > > > >> user to
>>> > > > >> > > >> learn
>>> > > > >> > > >> > > and
>>> > > > >> > > >> > > > > use.
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > For your question about increasing the
>>> burden in
>>> > > > >> > > >> > > > > > >
>>> `StreamOperator::prepareSnapshotPreBarrier()`, the
>>> > > > only
>>> > > > >> > > thing
>>> > > > >> > > >> > this
>>> > > > >> > > >> > > > > > function
>>> > > > >> > > >> > > > > > > need
>>> > > > >> > > >> > > > > > > to do is output all the partial results, it's
>>> > > purely
>>> > > > >> cpu
>>> > > > >> > > >> > workload,
>>> > > > >> > > >> > > > not
>>> > > > >> > > >> > > > > > > introducing any IO. I want to point out that
>>> even
>>> > > if
>>> > > > we
>>> > > > >> > have
>>> > > > >> > > >> this
>>> > > > >> > > >> > > > > > > cost, we reduced another barrier align cost
>>> of the
>>> > > > >> > operator,
>>> > > > >> > > >> > which
>>> > > > >> > > >> > > is
>>> > > > >> > > >> > > > > the
>>> > > > >> > > >> > > > > > > sync flush stage of the state, if you
>>> introduced
>>> > > > state.
>>> > > > >> > This
>>> > > > >> > > >> > > > > > > flush actually will introduce disk IO, and I
>>> think
>>> > > > it's
>>> > > > >> > > >> worthy to
>>> > > > >> > > >> > > > > > exchange
>>> > > > >> > > >> > > > > > > this cost with purely CPU workload. And we
>>> do have
>>> > > > some
>>> > > > >> > > >> > > > > > > observations about these two behavior (as i
>>> said
>>> > > > >> before,
>>> > > > >> > we
>>> > > > >> > > >> > > actually
>>> > > > >> > > >> > > > > > > implemented both solutions), the stateless
>>> one
>>> > > > actually
>>> > > > >> > > >> performs
>>> > > > >> > > >> > > > > > > better both in performance and barrier align
>>> time.
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > Best,
>>> > > > >> > > >> > > > > > > Kurt
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
>>> > > > >> > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>> > > > >> > > >> > >
>>> > > > >> > > >> > > > > wrote:
>>> > > > >> > > >> > > > > > >
>>> > > > >> > > >> > > > > > >> Hi Kurt,
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> Thanks for your example. Now, it looks more
>>> > > clearly
>>> > > > >> for
>>> > > > >> > me.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> From your example code snippet, I saw the
>>> > > > >> localAggregate
>>> > > > >> > > API
>>> > > > >> > > >> has
>>> > > > >> > > >> > > > three
>>> > > > >> > > >> > > > > > >> parameters:
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >>   1. key field
>>> > > > >> > > >> > > > > > >>   2. PartitionAvg
>>> > > > >> > > >> > > > > > >>   3. CountTrigger: Does this trigger comes
>>> from
>>> > > > window
>>> > > > >> > > >> package?
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> I will compare our and your design from API
>>> and
>>> > > > >> operator
>>> > > > >> > > >> level:
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> *From the API level:*
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> As I replied to @dianfu in the old email
>>> > > thread,[1]
>>> > > > >> the
>>> > > > >> > > >> Window
>>> > > > >> > > >> > API
>>> > > > >> > > >> > > > can
>>> > > > >> > > >> > > > > > >> provide the second and the third parameter
>>> right
>>> > > > now.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> If you reuse specified interface or class,
>>> such as
>>> > > > >> > > *Trigger*
>>> > > > >> > > >> or
>>> > > > >> > > >> > > > > > >> *CounterTrigger* provided by window
>>> package, but
>>> > > do
>>> > > > >> not
>>> > > > >> > use
>>> > > > >> > > >> > window
>>> > > > >> > > >> > > > > API,
>>> > > > >> > > >> > > > > > >> it's not reasonable.
>>> > > > >> > > >> > > > > > >> And if you do not reuse these interface or
>>> class,
>>> > > > you
>>> > > > >> > would
>>> > > > >> > > >> need
>>> > > > >> > > >> > > to
>>> > > > >> > > >> > > > > > >> introduce more things however they are
>>> looked
>>> > > > similar
>>> > > > >> to
>>> > > > >> > > the
>>> > > > >> > > >> > > things
>>> > > > >> > > >> > > > > > >> provided by window package.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> The window package has provided several
>>> types of
>>> > > the
>>> > > > >> > window
>>> > > > >> > > >> and
>>> > > > >> > > >> > > many
>>> > > > >> > > >> > > > > > >> triggers and let users customize it. What's
>>> more,
>>> > > > the
>>> > > > >> > user
>>> > > > >> > > is
>>> > > > >> > > >> > more
>>> > > > >> > > >> > > > > > familiar
>>> > > > >> > > >> > > > > > >> with Window API.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> This is the reason why we just provide
>>> localKeyBy
>>> > > > API
>>> > > > >> and
>>> > > > >> > > >> reuse
>>> > > > >> > > >> > > the
>>> > > > >> > > >> > > > > > window
>>> > > > >> > > >> > > > > > >> API. It reduces unnecessary components such
>>> as
>>> > > > >> triggers
>>> > > > >> > and
>>> > > > >> > > >> the
>>> > > > >> > > >> > > > > > mechanism
>>> > > > >> > > >> > > > > > >> of buffer (based on count num or time).
>>> > > > >> > > >> > > > > > >> And it has a clear and easy to understand
>>> > > semantics.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> *From the operator level:*
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> We reused window operator, so we can get
>>> all the
>>> > > > >> benefits
>>> > > > >> > > >> from
>>> > > > >> > > >> > > state
>>> > > > >> > > >> > > > > and
>>> > > > >> > > >> > > > > > >> checkpoint.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> From your design, you named the operator
>>> under
>>> > > > >> > > localAggregate
>>> > > > >> > > >> > API
>>> > > > >> > > >> > > > is a
>>> > > > >> > > >> > > > > > >> *stateless* operator. IMO, it is still a
>>> state, it
>>> > > > is
>>> > > > >> > just
>>> > > > >> > > >> not
>>> > > > >> > > >> > > Flink
>>> > > > >> > > >> > > > > > >> managed state.
>>> > > > >> > > >> > > > > > >> About the memory buffer (I think it's still
>>> not
>>> > > very
>>> > > > >> > clear,
>>> > > > >> > > >> if
>>> > > > >> > > >> > you
>>> > > > >> > > >> > > > > have
>>> > > > >> > > >> > > > > > >> time, can you give more detail information
>>> or
>>> > > answer
>>> > > > >> my
>>> > > > >> > > >> > > questions),
>>> > > > >> > > >> > > > I
>>> > > > >> > > >> > > > > > have
>>> > > > >> > > >> > > > > > >> some questions:
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >>   - if it just a raw JVM heap memory
>>> buffer, how
>>> > > to
>>> > > > >> > support
>>> > > > >> > > >> > fault
>>> > > > >> > > >> > > > > > >>   tolerance, if the job is configured
>>> EXACTLY-ONCE
>>> > > > >> > semantic
>>> > > > >> > > >> > > > guarantee?
>>> > > > >> > > >> > > > > > >>   - if you thought the memory
>>> buffer(non-Flink
>>> > > > state),
>>> > > > >> > has
>>> > > > >> > > >> > better
>>> > > > >> > > >> > > > > > >>   performance. In our design, users can also
>>> > > config
>>> > > > >> HEAP
>>> > > > >> > > >> state
>>> > > > >> > > >> > > > backend
>>> > > > >> > > >> > > > > > to
>>> > > > >> > > >> > > > > > >>   provide the performance close to your
>>> mechanism.
>>> > > > >> > > >> > > > > > >>   -
>>> `StreamOperator::prepareSnapshotPreBarrier()`
>>> > > > >> related
>>> > > > >> > > to
>>> > > > >> > > >> the
>>> > > > >> > > >> > > > > timing
>>> > > > >> > > >> > > > > > of
>>> > > > >> > > >> > > > > > >>   snapshot. IMO, the flush action should be
>>> a
>>> > > > >> > synchronized
>>> > > > >> > > >> > action?
>>> > > > >> > > >> > > > (if
>>> > > > >> > > >> > > > > > >> not,
>>> > > > >> > > >> > > > > > >>   please point out my mistake) I still
>>> think we
>>> > > > should
>>> > > > >> > not
>>> > > > >> > > >> > depend
>>> > > > >> > > >> > > on
>>> > > > >> > > >> > > > > the
>>> > > > >> > > >> > > > > > >>   timing of checkpoint. Checkpoint related
>>> > > > operations
>>> > > > >> are
>>> > > > >> > > >> > inherent
>>> > > > >> > > >> > > > > > >>   performance sensitive, we should not
>>> increase
>>> > > its
>>> > > > >> > burden
>>> > > > >> > > >> > > anymore.
>>> > > > >> > > >> > > > > Our
>>> > > > >> > > >> > > > > > >>   implementation based on the mechanism of
>>> Flink's
>>> > > > >> > > >> checkpoint,
>>> > > > >> > > >> > > which
>>> > > > >> > > >> > > > > can
>>> > > > >> > > >> > > > > > >>   benefit from the asnyc snapshot and
>>> incremental
>>> > > > >> > > checkpoint.
>>> > > > >> > > >> > IMO,
>>> > > > >> > > >> > > > the
>>> > > > >> > > >> > > > > > >>   performance is not a problem, and we also
>>> do not
>>> > > > >> find
>>> > > > >> > the
>>> > > > >> > > >> > > > > performance
>>> > > > >> > > >> > > > > > >> issue
>>> > > > >> > > >> > > > > > >>   in our production.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> [1]:
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> <
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> >
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> Best,
>>> > > > >> > > >> > > > > > >> Vino
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com <mailto:
>>> ykt...@gmail.com>> 于2019年6月18日周二
>>> > > > 下午2:27写道:
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > > >>> Yeah, sorry for not expressing myself
>>> clearly. I
>>> > > > will
>>> > > > >> > try
>>> > > > >> > > to
>>> > > > >> > > >> > > > provide
>>> > > > >> > > >> > > > > > more
>>> > > > >> > > >> > > > > > >>> details to make sure we are on the same
>>> page.
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be
>>> optimized
>>> > > > >> > > automatically.
>>> > > > >> > > >> > You
>>> > > > >> > > >> > > > have
>>> > > > >> > > >> > > > > > to
>>> > > > >> > > >> > > > > > >>> explicitly call API to do local aggregation
>>> > > > >> > > >> > > > > > >>> as well as the trigger policy of the local
>>> > > > >> aggregation.
>>> > > > >> > > Take
>>> > > > >> > > >> > > > average
>>> > > > >> > > >> > > > > > for
>>> > > > >> > > >> > > > > > >>> example, the user program may look like
>>> this
>>> > > (just
>>> > > > a
>>> > > > >> > > draft):
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> assuming the input type is
>>> > > DataStream<Tupl2<String,
>>> > > > >> > Int>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> ds.localAggregate(
>>> > > > >> > > >> > > > > > >>>        0,
>>> > >  //
>>> > > > >> The
>>> > > > >> > > local
>>> > > > >> > > >> > key,
>>> > > > >> > > >> > > > > which
>>> > > > >> > > >> > > > > > >> is
>>> > > > >> > > >> > > > > > >>> the String from Tuple2
>>> > > > >> > > >> > > > > > >>>        PartitionAvg(1),                 //
>>> The
>>> > > > >> partial
>>> > > > >> > > >> > > aggregation
>>> > > > >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>,
>>> indicating
>>> > > > sum
>>> > > > >> and
>>> > > > >> > > >> count
>>> > > > >> > > >> > > > > > >>>        CountTrigger.of(1000L)    // Trigger
>>> > > policy,
>>> > > > >> note
>>> > > > >> > > >> this
>>> > > > >> > > >> > > > should
>>> > > > >> > > >> > > > > be
>>> > > > >> > > >> > > > > > >>> best effort, and also be composited with
>>> time
>>> > > based
>>> > > > >> or
>>> > > > >> > > >> memory
>>> > > > >> > > >> > > size
>>> > > > >> > > >> > > > > > based
>>> > > > >> > > >> > > > > > >>> trigger
>>> > > > >> > > >> > > > > > >>>    )
>>>      //
>>> > > > The
>>> > > > >> > > return
>>> > > > >> > > >> > type
>>> > > > >> > > >> > > > is
>>> > > > >> > > >> > > > > > >> local
>>> > > > >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
>>> > > > >> > > >> > > > > > >>>    .keyBy(0)                             //
>>> > > Further
>>> > > > >> > keyby
>>> > > > >> > > it
>>> > > > >> > > >> > with
>>> > > > >> > > >> > > > > > >> required
>>> > > > >> > > >> > > > > > >>> key
>>> > > > >> > > >> > > > > > >>>    .aggregate(1)                      //
>>> This
>>> > > will
>>> > > > >> merge
>>> > > > >> > > all
>>> > > > >> > > >> > the
>>> > > > >> > > >> > > > > > partial
>>> > > > >> > > >> > > > > > >>> results and get the final average.
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> (This is only a draft, only trying to
>>> explain
>>> > > what
>>> > > > it
>>> > > > >> > > looks
>>> > > > >> > > >> > > like. )
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> The local aggregate operator can be
>>> stateless, we
>>> > > > can
>>> > > > >> > > keep a
>>> > > > >> > > >> > > memory
>>> > > > >> > > >> > > > > > >> buffer
>>> > > > >> > > >> > > > > > >>> or other efficient data structure to
>>> improve the
>>> > > > >> > aggregate
>>> > > > >> > > >> > > > > performance.
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> Let me know if you have any other
>>> questions.
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> Best,
>>> > > > >> > > >> > > > > > >>> Kurt
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <
>>> > > > >> > > >> > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > > > > > wrote:
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>>> Hi Kurt,
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> Thanks for your reply.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> Actually, I am not against you to raise
>>> your
>>> > > > design.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> From your description before, I just can
>>> imagine
>>> > > > >> your
>>> > > > >> > > >> > high-level
>>> > > > >> > > >> > > > > > >>>> implementation is about SQL and the
>>> optimization
>>> > > > is
>>> > > > >> > inner
>>> > > > >> > > >> of
>>> > > > >> > > >> > the
>>> > > > >> > > >> > > > > API.
>>> > > > >> > > >> > > > > > >> Is
>>> > > > >> > > >> > > > > > >>> it
>>> > > > >> > > >> > > > > > >>>> automatically? how to give the
>>> configuration
>>> > > > option
>>> > > > >> > about
>>> > > > >> > > >> > > trigger
>>> > > > >> > > >> > > > > > >>>> pre-aggregation?
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> Maybe after I get more information, it
>>> sounds
>>> > > more
>>> > > > >> > > >> reasonable.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> IMO, first of all, it would be better to
>>> make
>>> > > your
>>> > > > >> user
>>> > > > >> > > >> > > interface
>>> > > > >> > > >> > > > > > >>> concrete,
>>> > > > >> > > >> > > > > > >>>> it's the basis of the discussion.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> For example, can you give an example code
>>> > > snippet
>>> > > > to
>>> > > > >> > > >> introduce
>>> > > > >> > > >> > > how
>>> > > > >> > > >> > > > > to
>>> > > > >> > > >> > > > > > >>> help
>>> > > > >> > > >> > > > > > >>>> users to process data skew caused by the
>>> jobs
>>> > > > which
>>> > > > >> > built
>>> > > > >> > > >> with
>>> > > > >> > > >> > > > > > >> DataStream
>>> > > > >> > > >> > > > > > >>>> API?
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> If you give more details we can discuss
>>> further
>>> > > > >> more. I
>>> > > > >> > > >> think
>>> > > > >> > > >> > if
>>> > > > >> > > >> > > > one
>>> > > > >> > > >> > > > > > >>> design
>>> > > > >> > > >> > > > > > >>>> introduces an exact interface and another
>>> does
>>> > > > not.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> The implementation has an obvious
>>> difference.
>>> > > For
>>> > > > >> > > example,
>>> > > > >> > > >> we
>>> > > > >> > > >> > > > > > introduce
>>> > > > >> > > >> > > > > > >>> an
>>> > > > >> > > >> > > > > > >>>> exact API in DataStream named localKeyBy,
>>> about
>>> > > > the
>>> > > > >> > > >> > > > pre-aggregation
>>> > > > >> > > >> > > > > we
>>> > > > >> > > >> > > > > > >>> need
>>> > > > >> > > >> > > > > > >>>> to define the trigger mechanism of local
>>> > > > >> aggregation,
>>> > > > >> > so
>>> > > > >> > > we
>>> > > > >> > > >> > find
>>> > > > >> > > >> > > > > > reused
>>> > > > >> > > >> > > > > > >>>> window API and operator is a good choice.
>>> This
>>> > > is
>>> > > > a
>>> > > > >> > > >> reasoning
>>> > > > >> > > >> > > link
>>> > > > >> > > >> > > > > > from
>>> > > > >> > > >> > > > > > >>>> design to implementation.
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> What do you think?
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> Best,
>>> > > > >> > > >> > > > > > >>>> Vino
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com <mailto:
>>> ykt...@gmail.com>> 于2019年6月18日周二
>>> > > > >> 上午11:58写道:
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>>> Hi Vino,
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> Now I feel that we may have different
>>> > > > >> understandings
>>> > > > >> > > about
>>> > > > >> > > >> > what
>>> > > > >> > > >> > > > > kind
>>> > > > >> > > >> > > > > > >> of
>>> > > > >> > > >> > > > > > >>>>> problems or improvements you want to
>>> > > > >> > > >> > > > > > >>>>> resolve. Currently, most of the feedback
>>> are
>>> > > > >> focusing
>>> > > > >> > on
>>> > > > >> > > >> *how
>>> > > > >> > > >> > > to
>>> > > > >> > > >> > > > > do a
>>> > > > >> > > >> > > > > > >>>>> proper local aggregation to improve
>>> performance
>>> > > > >> > > >> > > > > > >>>>> and maybe solving the data skew issue*.
>>> And my
>>> > > > gut
>>> > > > >> > > >> feeling is
>>> > > > >> > > >> > > > this
>>> > > > >> > > >> > > > > is
>>> > > > >> > > >> > > > > > >>>>> exactly what users want at the first
>>> place,
>>> > > > >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to
>>> > > summarize
>>> > > > >> here,
>>> > > > >> > > >> please
>>> > > > >> > > >> > > > > correct
>>> > > > >> > > >> > > > > > >>> me
>>> > > > >> > > >> > > > > > >>>> if
>>> > > > >> > > >> > > > > > >>>>> i'm wrong).
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> But I still think the design is somehow
>>> > > diverged
>>> > > > >> from
>>> > > > >> > > the
>>> > > > >> > > >> > goal.
>>> > > > >> > > >> > > > If
>>> > > > >> > > >> > > > > we
>>> > > > >> > > >> > > > > > >>>> want
>>> > > > >> > > >> > > > > > >>>>> to have an efficient and powerful way to
>>> > > > >> > > >> > > > > > >>>>> have local aggregation, supporting
>>> intermedia
>>> > > > >> result
>>> > > > >> > > type
>>> > > > >> > > >> is
>>> > > > >> > > >> > > > > > >> essential
>>> > > > >> > > >> > > > > > >>>> IMO.
>>> > > > >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and
>>> > > > >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction`
>>> have a
>>> > > > proper
>>> > > > >> > > >> support of
>>> > > > >> > > >> > > > > > >>>> intermediate
>>> > > > >> > > >> > > > > > >>>>> result type and can do `merge` operation
>>> > > > >> > > >> > > > > > >>>>> on them.
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives
>>> which
>>> > > > >> performs
>>> > > > >> > > >> well,
>>> > > > >> > > >> > > and
>>> > > > >> > > >> > > > > > >> have a
>>> > > > >> > > >> > > > > > >>>>> nice fit with the local aggregate
>>> requirements.
>>> > > > >> > > >> > > > > > >>>>> Mostly importantly,  it's much less
>>> complex
>>> > > > because
>>> > > > >> > it's
>>> > > > >> > > >> > > > stateless.
>>> > > > >> > > >> > > > > > >> And
>>> > > > >> > > >> > > > > > >>>> it
>>> > > > >> > > >> > > > > > >>>>> can also achieve the similar
>>> > > multiple-aggregation
>>> > > > >> > > >> > > > > > >>>>> scenario.
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> I still not convinced why we shouldn't
>>> consider
>>> > > > it
>>> > > > >> as
>>> > > > >> > a
>>> > > > >> > > >> first
>>> > > > >> > > >> > > > step.
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> Best,
>>> > > > >> > > >> > > > > > >>>>> Kurt
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino
>>> yang <
>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto:
>>> yanghua1...@gmail.com>>
>>> > > > >> > > >> > > > > > >>>> wrote:
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>>> Hi Kurt,
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Thanks for your comments.
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> It seems we both implemented local
>>> aggregation
>>> > > > >> > feature
>>> > > > >> > > to
>>> > > > >> > > >> > > > optimize
>>> > > > >> > > >> > > > > > >>> the
>>> > > > >> > > >> > > > > > >>>>>> issue of data skew.
>>> > > > >> > > >> > > > > > >>>>>> However, IMHO, the API level of
>>> optimizing
>>> > > > >> revenue is
>>> > > > >> > > >> > > different.
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink
>>> SQL and
>>> > > > >> it's
>>> > > > >> > not
>>> > > > >> > > >> > user's
>>> > > > >> > > >> > > > > > >>>> faces.(If
>>> > > > >> > > >> > > > > > >>>>> I
>>> > > > >> > > >> > > > > > >>>>>> understand it incorrectly, please
>>> correct
>>> > > > this.)*
>>> > > > >> > > >> > > > > > >>>>>> *Our implementation employs it as an
>>> > > > optimization
>>> > > > >> > tool
>>> > > > >> > > >> API
>>> > > > >> > > >> > for
>>> > > > >> > > >> > > > > > >>>>> DataStream,
>>> > > > >> > > >> > > > > > >>>>>> it just like a local version of the
>>> keyBy
>>> > > API.*
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Based on this, I want to say support it
>>> as a
>>> > > > >> > DataStream
>>> > > > >> > > >> API
>>> > > > >> > > >> > > can
>>> > > > >> > > >> > > > > > >>> provide
>>> > > > >> > > >> > > > > > >>>>>> these advantages:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>>   - The localKeyBy API has a clear
>>> semantic
>>> > > and
>>> > > > >> it's
>>> > > > >> > > >> > flexible
>>> > > > >> > > >> > > > not
>>> > > > >> > > >> > > > > > >>> only
>>> > > > >> > > >> > > > > > >>>>> for
>>> > > > >> > > >> > > > > > >>>>>>   processing data skew but also for
>>> > > implementing
>>> > > > >> some
>>> > > > >> > > >> user
>>> > > > >> > > >> > > > cases,
>>> > > > >> > > >> > > > > > >>> for
>>> > > > >> > > >> > > > > > >>>>>>   example, if we want to calculate the
>>> > > > >> multiple-level
>>> > > > >> > > >> > > > aggregation,
>>> > > > >> > > >> > > > > > >>> we
>>> > > > >> > > >> > > > > > >>>>> can
>>> > > > >> > > >> > > > > > >>>>>> do
>>> > > > >> > > >> > > > > > >>>>>>   multiple-level aggregation in the
>>> local
>>> > > > >> > aggregation:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > >  input.localKeyBy("a").sum(1).localKeyBy("b").window();
>>> > > > >> > > >> //
>>> > > > >> > > >> > > here
>>> > > > >> > > >> > > > > > >> "a"
>>> > > > >> > > >> > > > > > >>>> is
>>> > > > >> > > >> > > > > > >>>>> a
>>> > > > >> > > >> > > > > > >>>>>>   sub-category, while "b" is a
>>> category, here
>>> > > we
>>> > > > >> do
>>> > > > >> > not
>>> > > > >> > > >> need
>>> > > > >> > > >> > > to
>>> > > > >> > > >> > > > > > >>>> shuffle
>>> > > > >> > > >> > > > > > >>>>>> data
>>> > > > >> > > >> > > > > > >>>>>>   in the network.
>>> > > > >> > > >> > > > > > >>>>>>   - The users of DataStream API will
>>> benefit
>>> > > > from
>>> > > > >> > this.
>>> > > > >> > > >> > > > Actually,
>>> > > > >> > > >> > > > > > >> we
>>> > > > >> > > >> > > > > > >>>>> have
>>> > > > >> > > >> > > > > > >>>>>>   a lot of scenes need to use
>>> DataStream API.
>>> > > > >> > > Currently,
>>> > > > >> > > >> > > > > > >> DataStream
>>> > > > >> > > >> > > > > > >>>> API
>>> > > > >> > > >> > > > > > >>>>> is
>>> > > > >> > > >> > > > > > >>>>>>   the cornerstone of the physical plan
>>> of
>>> > > Flink
>>> > > > >> SQL.
>>> > > > >> > > >> With a
>>> > > > >> > > >> > > > > > >>> localKeyBy
>>> > > > >> > > >> > > > > > >>>>>> API,
>>> > > > >> > > >> > > > > > >>>>>>   the optimization of SQL at least may
>>> use
>>> > > this
>>> > > > >> > > optimized
>>> > > > >> > > >> > API,
>>> > > > >> > > >> > > > > > >> this
>>> > > > >> > > >> > > > > > >>>> is a
>>> > > > >> > > >> > > > > > >>>>>>   further topic.
>>> > > > >> > > >> > > > > > >>>>>>   - Based on the window operator, our
>>> state
>>> > > > would
>>> > > > >> > > benefit
>>> > > > >> > > >> > from
>>> > > > >> > > >> > > > > > >> Flink
>>> > > > >> > > >> > > > > > >>>>> State
>>> > > > >> > > >> > > > > > >>>>>>   and checkpoint, we do not need to
>>> worry
>>> > > about
>>> > > > >> OOM
>>> > > > >> > and
>>> > > > >> > > >> job
>>> > > > >> > > >> > > > > > >> failed.
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Now, about your questions:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> 1. About our design cannot change the
>>> data
>>> > > type
>>> > > > >> and
>>> > > > >> > > about
>>> > > > >> > > >> > the
>>> > > > >> > > >> > > > > > >>>>>> implementation of average:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the
>>> localKeyBy is
>>> > > > an
>>> > > > >> API
>>> > > > >> > > >> > provides
>>> > > > >> > > >> > > > to
>>> > > > >> > > >> > > > > > >> the
>>> > > > >> > > >> > > > > > >>>>> users
>>> > > > >> > > >> > > > > > >>>>>> who use DataStream API to build their
>>> jobs.
>>> > > > >> > > >> > > > > > >>>>>> Users should know its semantics and the
>>> > > > difference
>>> > > > >> > with
>>> > > > >> > > >> > keyBy
>>> > > > >> > > >> > > > API,
>>> > > > >> > > >> > > > > > >> so
>>> > > > >> > > >> > > > > > >>>> if
>>> > > > >> > > >> > > > > > >>>>>> they want to the average aggregation,
>>> they
>>> > > > should
>>> > > > >> > carry
>>> > > > >> > > >> > local
>>> > > > >> > > >> > > > sum
>>> > > > >> > > >> > > > > > >>>> result
>>> > > > >> > > >> > > > > > >>>>>> and local count result.
>>> > > > >> > > >> > > > > > >>>>>> I admit that it will be convenient to
>>> use
>>> > > keyBy
>>> > > > >> > > directly.
>>> > > > >> > > >> > But
>>> > > > >> > > >> > > we
>>> > > > >> > > >> > > > > > >> need
>>> > > > >> > > >> > > > > > >>>> to
>>> > > > >> > > >> > > > > > >>>>>> pay a little price when we get some
>>> benefits.
>>> > > I
>>> > > > >> think
>>> > > > >> > > >> this
>>> > > > >> > > >> > > price
>>> > > > >> > > >> > > > > is
>>> > > > >> > > >> > > > > > >>>>>> reasonable. Considering that the
>>> DataStream
>>> > > API
>>> > > > >> > itself
>>> > > > >> > > >> is a
>>> > > > >> > > >> > > > > > >> low-level
>>> > > > >> > > >> > > > > > >>>> API
>>> > > > >> > > >> > > > > > >>>>>> (at least for now).
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> 2. About stateless operator and
>>> > > > >> > > >> > > > > > >>>>>>
>>> `StreamOperator::prepareSnapshotPreBarrier()`:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Actually, I have discussed this opinion
>>> with
>>> > > > >> @dianfu
>>> > > > >> > in
>>> > > > >> > > >> the
>>> > > > >> > > >> > > old
>>> > > > >> > > >> > > > > > >> mail
>>> > > > >> > > >> > > > > > >>>>>> thread. I will copy my opinion from
>>> there:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>>   - for your design, you still need
>>> somewhere
>>> > > to
>>> > > > >> give
>>> > > > >> > > the
>>> > > > >> > > >> > > users
>>> > > > >> > > >> > > > > > >>>>> configure
>>> > > > >> > > >> > > > > > >>>>>>   the trigger threshold (maybe memory
>>> > > > >> availability?),
>>> > > > >> > > >> this
>>> > > > >> > > >> > > > design
>>> > > > >> > > >> > > > > > >>>> cannot
>>> > > > >> > > >> > > > > > >>>>>>   guarantee a deterministic semantics
>>> (it will
>>> > > > >> bring
>>> > > > >> > > >> trouble
>>> > > > >> > > >> > > for
>>> > > > >> > > >> > > > > > >>>> testing
>>> > > > >> > > >> > > > > > >>>>>> and
>>> > > > >> > > >> > > > > > >>>>>>   debugging).
>>> > > > >> > > >> > > > > > >>>>>>   - if the implementation depends on the
>>> > > timing
>>> > > > of
>>> > > > >> > > >> > checkpoint,
>>> > > > >> > > >> > > > it
>>> > > > >> > > >> > > > > > >>>> would
>>> > > > >> > > >> > > > > > >>>>>>   affect the checkpoint's progress, and
>>> the
>>> > > > >> buffered
>>> > > > >> > > data
>>> > > > >> > > >> > may
>>> > > > >> > > >> > > > > > >> cause
>>> > > > >> > > >> > > > > > >>>> OOM
>>> > > > >> > > >> > > > > > >>>>>>   issue. In addition, if the operator is
>>> > > > >> stateless,
>>> > > > >> > it
>>> > > > >> > > >> can
>>> > > > >> > > >> > not
>>> > > > >> > > >> > > > > > >>> provide
>>> > > > >> > > >> > > > > > >>>>>> fault
>>> > > > >> > > >> > > > > > >>>>>>   tolerance.
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>> Vino
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com <mailto:
>>> ykt...@gmail.com>> 于2019年6月18日周二
>>> > > > >> > 上午9:22写道:
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> Hi Vino,
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the
>>> general
>>> > > > idea
>>> > > > >> and
>>> > > > >> > > IMO
>>> > > > >> > > >> > it's
>>> > > > >> > > >> > > > > > >> very
>>> > > > >> > > >> > > > > > >>>>> useful
>>> > > > >> > > >> > > > > > >>>>>>> feature.
>>> > > > >> > > >> > > > > > >>>>>>> But after reading through the
>>> document, I
>>> > > feel
>>> > > > >> that
>>> > > > >> > we
>>> > > > >> > > >> may
>>> > > > >> > > >> > > over
>>> > > > >> > > >> > > > > > >>>> design
>>> > > > >> > > >> > > > > > >>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>> required
>>> > > > >> > > >> > > > > > >>>>>>> operator for proper local aggregation.
>>> The
>>> > > main
>>> > > > >> > reason
>>> > > > >> > > >> is
>>> > > > >> > > >> > we
>>> > > > >> > > >> > > > want
>>> > > > >> > > >> > > > > > >>> to
>>> > > > >> > > >> > > > > > >>>>>> have a
>>> > > > >> > > >> > > > > > >>>>>>> clear definition and behavior about the
>>> > > "local
>>> > > > >> keyed
>>> > > > >> > > >> state"
>>> > > > >> > > >> > > > which
>>> > > > >> > > >> > > > > > >>> in
>>> > > > >> > > >> > > > > > >>>> my
>>> > > > >> > > >> > > > > > >>>>>>> opinion is not
>>> > > > >> > > >> > > > > > >>>>>>> necessary for local aggregation, at
>>> least for
>>> > > > >> start.
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> Another issue I noticed is the local
>>> key by
>>> > > > >> operator
>>> > > > >> > > >> cannot
>>> > > > >> > > >> > > > > > >> change
>>> > > > >> > > >> > > > > > >>>>>> element
>>> > > > >> > > >> > > > > > >>>>>>> type, it will
>>> > > > >> > > >> > > > > > >>>>>>> also restrict a lot of use cases which
>>> can be
>>> > > > >> > benefit
>>> > > > >> > > >> from
>>> > > > >> > > >> > > > local
>>> > > > >> > > >> > > > > > >>>>>>> aggregation, like "average".
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and
>>> the only
>>> > > > >> thing
>>> > > > >> > > >> need to
>>> > > > >> > > >> > > be
>>> > > > >> > > >> > > > > > >> done
>>> > > > >> > > >> > > > > > >>>> is
>>> > > > >> > > >> > > > > > >>>>>>> introduce
>>> > > > >> > > >> > > > > > >>>>>>> a stateless lightweight operator which
>>> is
>>> > > > >> *chained*
>>> > > > >> > > >> before
>>> > > > >> > > >> > > > > > >>> `keyby()`.
>>> > > > >> > > >> > > > > > >>>>> The
>>> > > > >> > > >> > > > > > >>>>>>> operator will flush all buffered
>>> > > > >> > > >> > > > > > >>>>>>> elements during
>>> > > > >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()`
>>> > > > >> > > >> > > > and
>>> > > > >> > > >> > > > > > >>>> make
>>> > > > >> > > >> > > > > > >>>>>>> himself stateless.
>>> > > > >> > > >> > > > > > >>>>>>> By the way, in the earlier version we
>>> also
>>> > > did
>>> > > > >> the
>>> > > > >> > > >> similar
>>> > > > >> > > >> > > > > > >> approach
>>> > > > >> > > >> > > > > > >>>> by
>>> > > > >> > > >> > > > > > >>>>>>> introducing a stateful
>>> > > > >> > > >> > > > > > >>>>>>> local aggregation operator but it's not
>>> > > > >> performed as
>>> > > > >> > > >> well
>>> > > > >> > > >> > as
>>> > > > >> > > >> > > > the
>>> > > > >> > > >> > > > > > >>>> later
>>> > > > >> > > >> > > > > > >>>>>> one,
>>> > > > >> > > >> > > > > > >>>>>>> and also effect the barrie
>>> > > > >> > > >> > > > > > >>>>>>> alignment time. The later one is fairly
>>> > > simple
>>> > > > >> and
>>> > > > >> > > more
>>> > > > >> > > >> > > > > > >> efficient.
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> I would highly suggest you to consider
>>> to
>>> > > have
>>> > > > a
>>> > > > >> > > >> stateless
>>> > > > >> > > >> > > > > > >> approach
>>> > > > >> > > >> > > > > > >>>> at
>>> > > > >> > > >> > > > > > >>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>> first step.
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>> Kurt
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark
>>> Wu <
>>> > > > >> > > >> imj...@gmail.com <mailto:imj...@gmail.com>>
>>> > > > >> > > >> > > > > > >> wrote:
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>> Hi Vino,
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>> Thanks for the proposal.
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>> Regarding to the
>>> "input.keyBy(0).sum(1)" vs
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> >
>>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
>>> > > > >> > > >> > > > > > >> have
>>> > > > >> > > >> > > > > > >>>> you
>>> > > > >> > > >> > > > > > >>>>>>> done
>>> > > > >> > > >> > > > > > >>>>>>>> some benchmark?
>>> > > > >> > > >> > > > > > >>>>>>>> Because I'm curious about how much
>>> > > performance
>>> > > > >> > > >> improvement
>>> > > > >> > > >> > > can
>>> > > > >> > > >> > > > > > >> we
>>> > > > >> > > >> > > > > > >>>> get
>>> > > > >> > > >> > > > > > >>>>>> by
>>> > > > >> > > >> > > > > > >>>>>>>> using count window as the local
>>> operator.
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>>> Jark
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino
>>> yang <
>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto:
>>> yanghua1...@gmail.com>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>>>> wrote:
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> Hi Hequn,
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> Thanks for your reply.
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to
>>> > > provide a
>>> > > > >> tool
>>> > > > >> > > >> which
>>> > > > >> > > >> > > can
>>> > > > >> > > >> > > > > > >>> let
>>> > > > >> > > >> > > > > > >>>>>> users
>>> > > > >> > > >> > > > > > >>>>>>> do
>>> > > > >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The
>>> behavior
>>> > > of
>>> > > > >> the
>>> > > > >> > > >> > > > > > >>> pre-aggregation
>>> > > > >> > > >> > > > > > >>>>> is
>>> > > > >> > > >> > > > > > >>>>>>>>> similar to keyBy API.
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> So the three cases are different, I
>>> will
>>> > > > >> describe
>>> > > > >> > > them
>>> > > > >> > > >> > one
>>> > > > >> > > >> > > by
>>> > > > >> > > >> > > > > > >>>> one:
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the result is
>>> event-driven,
>>> > > > each
>>> > > > >> > > event
>>> > > > >> > > >> can
>>> > > > >> > > >> > > > > > >>> produce
>>> > > > >> > > >> > > > > > >>>>> one
>>> > > > >> > > >> > > > > > >>>>>>> sum
>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation result and it is the
>>> latest one
>>> > > > >> from
>>> > > > >> > the
>>> > > > >> > > >> > source
>>> > > > >> > > >> > > > > > >>>> start.*
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> 2.
>>> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may have
>>> a
>>> > > > >> problem, it
>>> > > > >> > > >> would
>>> > > > >> > > >> > do
>>> > > > >> > > >> > > > > > >> the
>>> > > > >> > > >> > > > > > >>>>> local
>>> > > > >> > > >> > > > > > >>>>>>> sum
>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation and will produce the
>>> latest
>>> > > > partial
>>> > > > >> > > result
>>> > > > >> > > >> > from
>>> > > > >> > > >> > > > > > >> the
>>> > > > >> > > >> > > > > > >>>>>> source
>>> > > > >> > > >> > > > > > >>>>>>>>> start for every event. *
>>> > > > >> > > >> > > > > > >>>>>>>>> *These latest partial results from
>>> the same
>>> > > > key
>>> > > > >> > are
>>> > > > >> > > >> > hashed
>>> > > > >> > > >> > > to
>>> > > > >> > > >> > > > > > >>> one
>>> > > > >> > > >> > > > > > >>>>>> node
>>> > > > >> > > >> > > > > > >>>>>>> to
>>> > > > >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.*
>>> > > > >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it
>>> > > received
>>> > > > >> > > multiple
>>> > > > >> > > >> > > partial
>>> > > > >> > > >> > > > > > >>>>> results
>>> > > > >> > > >> > > > > > >>>>>>>> (they
>>> > > > >> > > >> > > > > > >>>>>>>>> are all calculated from the source
>>> start)
>>> > > and
>>> > > > >> sum
>>> > > > >> > > them
>>> > > > >> > > >> > will
>>> > > > >> > > >> > > > > > >> get
>>> > > > >> > > >> > > > > > >>>> the
>>> > > > >> > > >> > > > > > >>>>>>> wrong
>>> > > > >> > > >> > > > > > >>>>>>>>> result.*
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> 3.
>>> > > > >> > > >> > >
>>> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a
>>> partial
>>> > > > >> > > aggregation
>>> > > > >> > > >> > > result
>>> > > > >> > > >> > > > > > >>> for
>>> > > > >> > > >> > > > > > >>>>>> the 5
>>> > > > >> > > >> > > > > > >>>>>>>>> records in the count window. The
>>> partial
>>> > > > >> > aggregation
>>> > > > >> > > >> > > results
>>> > > > >> > > >> > > > > > >>> from
>>> > > > >> > > >> > > > > > >>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>> same
>>> > > > >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.*
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> So the first case and the third case
>>> can
>>> > > get
>>> > > > >> the
>>> > > > >> > > >> *same*
>>> > > > >> > > >> > > > > > >> result,
>>> > > > >> > > >> > > > > > >>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>> difference is the output-style and
>>> the
>>> > > > latency.
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key
>>> API is
>>> > > just
>>> > > > >> an
>>> > > > >> > > >> > > optimization
>>> > > > >> > > >> > > > > > >>>> API.
>>> > > > >> > > >> > > > > > >>>>> We
>>> > > > >> > > >> > > > > > >>>>>>> do
>>> > > > >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the
>>> user
>>> > > has
>>> > > > to
>>> > > > >> > > >> > understand
>>> > > > >> > > >> > > > > > >> its
>>> > > > >> > > >> > > > > > >>>>>>> semantics
>>> > > > >> > > >> > > > > > >>>>>>>>> and use it correctly.
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>>>> Vino
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com
>>> <mailto:chenghe...@gmail.com>>
>>> > > > >> 于2019年6月17日周一
>>> > > > >> > > >> > 下午4:18写道:
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> Hi Vino,
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it
>>> is a
>>> > > > very
>>> > > > >> > good
>>> > > > >> > > >> > > feature!
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is the
>>> > > > semantics
>>> > > > >> > for
>>> > > > >> > > >> the
>>> > > > >> > > >> > > > > > >>>>>> `localKeyBy`.
>>> > > > >> > > >> > > > > > >>>>>>>> From
>>> > > > >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API
>>> returns
>>> > > > an
>>> > > > >> > > >> instance
>>> > > > >> > > >> > of
>>> > > > >> > > >> > > > > > >>>>>>> `KeyedStream`
>>> > > > >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so in
>>> this
>>> > > > case,
>>> > > > >> > > what's
>>> > > > >> > > >> > the
>>> > > > >> > > >> > > > > > >>>>> semantics
>>> > > > >> > > >> > > > > > >>>>>>> for
>>> > > > >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will
>>> the
>>> > > > >> following
>>> > > > >> > > code
>>> > > > >> > > >> > share
>>> > > > >> > > >> > > > > > >>> the
>>> > > > >> > > >> > > > > > >>>>> same
>>> > > > >> > > >> > > > > > >>>>>>>>> result?
>>> > > > >> > > >> > > > > > >>>>>>>>>> and what're the differences between
>>> them?
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>> 2.
>>> > > > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>> 3.
>>> > > > >> > > >> > > > > > >>
>>> > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add
>>> this
>>> > > into
>>> > > > >> the
>>> > > > >> > > >> > document.
>>> > > > >> > > >> > > > > > >>> Thank
>>> > > > >> > > >> > > > > > >>>>> you
>>> > > > >> > > >> > > > > > >>>>>>>> very
>>> > > > >> > > >> > > > > > >>>>>>>>>> much.
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> Best, Hequn
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM
>>> vino
>>> > > yang <
>>> > > > >> > > >> > > > > > >>>>> yanghua1...@gmail.com <mailto:
>>> yanghua1...@gmail.com>>
>>> > > > >> > > >> > > > > > >>>>>>>>> wrote:
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha,
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*"
>>> section
>>> > > of
>>> > > > >> FLIP
>>> > > > >> > > >> wiki
>>> > > > >> > > >> > > > > > >>>> page.[1]
>>> > > > >> > > >> > > > > > >>>>>> This
>>> > > > >> > > >> > > > > > >>>>>>>>> mail
>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has
>>> proceeded to
>>> > > > the
>>> > > > >> > > third
>>> > > > >> > > >> > step.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth
>>> step(vote
>>> > > > step),
>>> > > > >> I
>>> > > > >> > > >> didn't
>>> > > > >> > > >> > > > > > >> find
>>> > > > >> > > >> > > > > > >>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the
>>> voting
>>> > > > >> process.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion of
>>> this
>>> > > > >> feature
>>> > > > >> > > has
>>> > > > >> > > >> > been
>>> > > > >> > > >> > > > > > >>> done
>>> > > > >> > > >> > > > > > >>>>> in
>>> > > > >> > > >> > > > > > >>>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>> old
>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when
>>> > > should
>>> > > > I
>>> > > > >> > start
>>> > > > >> > > >> > > > > > >> voting?
>>> > > > >> > > >> > > > > > >>>> Can
>>> > > > >> > > >> > > > > > >>>>> I
>>> > > > >> > > >> > > > > > >>>>>>>> start
>>> > > > >> > > >> > > > > > >>>>>>>>>> now?
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>>>>>> Vino
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> [1]:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
>>> <
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
>>> >
>>> > > > >> > > >> > > > > > >>>>>>>>>>> [2]:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> <
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> >
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com
>>> <mailto:leesf0...@gmail.com>>
>>> > > 于2019年6月13日周四
>>> > > > >> > > 上午9:19写道:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for
>>> your
>>> > > > >> efforts.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Leesf
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com
>>> <mailto:yanghua1...@gmail.com>>
>>> > > > >> > 于2019年6月12日周三
>>> > > > >> > > >> > > > > > >>> 下午5:46写道:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Hi folks,
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP
>>> > > discussion
>>> > > > >> > thread
>>> > > > >> > > >> > > > > > >> about
>>> > > > >> > > >> > > > > > >>>>>>> supporting
>>> > > > >> > > >> > > > > > >>>>>>>>>> local
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can
>>> effectively
>>> > > > >> > alleviate
>>> > > > >> > > >> data
>>> > > > >> > > >> > > > > > >>>> skew.
>>> > > > >> > > >> > > > > > >>>>>>> This
>>> > > > >> > > >> > > > > > >>>>>>>> is
>>> > > > >> > > >> > > > > > >>>>>>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> FLIP:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>>> <
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>>> >
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are
>>> widely
>>> > > used
>>> > > > to
>>> > > > >> > > >> perform
>>> > > > >> > > >> > > > > > >>>>>> aggregating
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> operations
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window)
>>> on the
>>> > > > >> elements
>>> > > > >> > > >> that
>>> > > > >> > > >> > > > > > >>> have
>>> > > > >> > > >> > > > > > >>>>> the
>>> > > > >> > > >> > > > > > >>>>>>> same
>>> > > > >> > > >> > > > > > >>>>>>>>>> key.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> When
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the
>>> elements with
>>> > > > the
>>> > > > >> > same
>>> > > > >> > > >> key
>>> > > > >> > > >> > > > > > >>> will
>>> > > > >> > > >> > > > > > >>>> be
>>> > > > >> > > >> > > > > > >>>>>>> sent
>>> > > > >> > > >> > > > > > >>>>>>>> to
>>> > > > >> > > >> > > > > > >>>>>>>>>> and
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> The performance of these
>>> aggregating
>>> > > > >> > operations
>>> > > > >> > > is
>>> > > > >> > > >> > > > > > >> very
>>> > > > >> > > >> > > > > > >>>>>>> sensitive
>>> > > > >> > > >> > > > > > >>>>>>>>> to
>>> > > > >> > > >> > > > > > >>>>>>>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the
>>> cases
>>> > > where
>>> > > > >> the
>>> > > > >> > > >> > > > > > >>> distribution
>>> > > > >> > > >> > > > > > >>>>> of
>>> > > > >> > > >> > > > > > >>>>>>> keys
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> follows a
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance
>>> will be
>>> > > > >> > > >> significantly
>>> > > > >> > > >> > > > > > >>>>>> downgraded.
>>> > > > >> > > >> > > > > > >>>>>>>>> More
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree
>>> of
>>> > > > >> > parallelism
>>> > > > >> > > >> does
>>> > > > >> > > >> > > > > > >>> not
>>> > > > >> > > >> > > > > > >>>>> help
>>> > > > >> > > >> > > > > > >>>>>>>> when
>>> > > > >> > > >> > > > > > >>>>>>>>> a
>>> > > > >> > > >> > > > > > >>>>>>>>>>> task
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a
>>> widely-adopted
>>> > > > >> method
>>> > > > >> > to
>>> > > > >> > > >> > > > > > >> reduce
>>> > > > >> > > >> > > > > > >>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>> performance
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can
>>> decompose
>>> > > > the
>>> > > > >> > > >> > > > > > >> aggregating
>>> > > > >> > > >> > > > > > >>>>>>>> operations
>>> > > > >> > > >> > > > > > >>>>>>>>>> into
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> two
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we
>>> > > aggregate
>>> > > > >> the
>>> > > > >> > > >> elements
>>> > > > >> > > >> > > > > > >>> of
>>> > > > >> > > >> > > > > > >>>>> the
>>> > > > >> > > >> > > > > > >>>>>>> same
>>> > > > >> > > >> > > > > > >>>>>>>>> key
>>> > > > >> > > >> > > > > > >>>>>>>>>>> at
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial
>>> > > > results.
>>> > > > >> > Then
>>> > > > >> > > at
>>> > > > >> > > >> > > > > > >> the
>>> > > > >> > > >> > > > > > >>>>> second
>>> > > > >> > > >> > > > > > >>>>>>>>> phase,
>>> > > > >> > > >> > > > > > >>>>>>>>>>>> these
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to
>>> receivers
>>> > > > >> > according
>>> > > > >> > > to
>>> > > > >> > > >> > > > > > >>> their
>>> > > > >> > > >> > > > > > >>>>> keys
>>> > > > >> > > >> > > > > > >>>>>>> and
>>> > > > >> > > >> > > > > > >>>>>>>>> are
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final
>>> result.
>>> > > > Since
>>> > > > >> the
>>> > > > >> > > >> number
>>> > > > >> > > >> > > > > > >>> of
>>> > > > >> > > >> > > > > > >>>>>>> partial
>>> > > > >> > > >> > > > > > >>>>>>>>>>> results
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is
>>> limited by
>>> > > > the
>>> > > > >> > > >> number of
>>> > > > >> > > >> > > > > > >>>>>> senders,
>>> > > > >> > > >> > > > > > >>>>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be
>>> > > reduced.
>>> > > > >> > > >> Besides, by
>>> > > > >> > > >> > > > > > >>>>>> reducing
>>> > > > >> > > >> > > > > > >>>>>>>> the
>>> > > > >> > > >> > > > > > >>>>>>>>>>> amount
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the
>>> performance can
>>> > > > be
>>> > > > >> > > further
>>> > > > >> > > >> > > > > > >>>>> improved.
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *More details*:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Design documentation:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>> <
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>> >
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread:
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> <
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>> >
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> https://issues.apache.org/jira/browse/FLINK-12786 <
>>> https://issues.apache.org/jira/browse/FLINK-12786>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your
>>> > > feedback!
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Best,
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Vino
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>>
>>> > > > >> > > >> > > > > > >>>>>>
>>> > > > >> > > >> > > > > > >>>>>
>>> > > > >> > > >> > > > > > >>>>
>>> > > > >> > > >> > > > > > >>>
>>> > > > >> > > >> > > > > > >>
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > > >
>>> > > > >> > > >> > > > >
>>> > > > >> > > >> > > >
>>> > > > >> > > >> > >
>>> > > > >> > > >> >
>>> > > > >> > > >>
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>>
>>>

Reply via email to