Hi Jark,

Similar questions and responses have been repeated many times.

Why didn't we spend more sections discussing the API?

Because we try to reuse the ability of KeyedStream. The localKeyBy API just
returns the KeyedStream, that's our design, we can get all the benefit from
the KeyedStream and get further benefit from WindowedStream. The APIs come
from KeyedStream and WindowedStream is long-tested and flexible. Yes, we
spend much space discussing the local keyed state, that's not the goal and
motivation, that's the way to implement local aggregation. It is much more
complicated than the API we introduced, so we spent more section. Of
course, this is the implementation level of the Operator. We also agreed to
support the implementation of buffer+flush and added related instructions
to the documentation. This needs to wait for the community to recognize,
and if the community agrees, we will give more instructions. What's more, I
have indicated before that we welcome state-related commenters to
participate in the discussion, but it is not wise to modify the FLIP title.

About the API of local aggregation:

I don't object to ease of use is very important. But IMHO flexibility is
the most important at the DataStream API level. Otherwise, what does
DataStream mean? The significance of the DataStream API is that it is more
flexible than Table/SQL, if it cannot provide this point then everyone
would just use Table/SQL.

The DataStream API should focus more on flexibility than on automatic
optimization, which allows users to have more possibilities to implement
complex programs and meet specific scenarios. There are a lot of programs
written using the DataStream API that are far more complex than we think.
It is very difficult to optimize at the API level and the benefit is very
low.

I want to say that we support a more generalized local aggregation. I
mentioned in the previous reply that not only the UDF that implements
AggregateFunction is called aggregation. In some complex scenarios, we have
to support local aggregation through ProcessFunction and
ProcessWindowFunction to solve the data skew problem. How do you support
them in the API implementation and optimization you mentioned?

Flexible APIs are arbitrarily combined to result in erroneous semantics,
which does not prove that flexibility is meaningless because the user is
the decision maker. I have been exemplified many times, for many APIs in
DataStream, if we arbitrarily combined them, they also do not have much
practical significance. So, users who use flexible APIs need to understand
what they are doing and what is the right choice.

I think that if we discuss this, there will be no result.

@Stephan Ewen <se...@apache.org> , @Aljoscha Krettek <aljos...@apache.org>
 and @Piotr Nowojski <pi...@ververica.com> Do you have further comments?


Jark Wu <imj...@gmail.com> 于2019年6月26日周三 上午11:46写道:

> Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others,
>
> It seems that we still have some different ideas about the API
> (localKeyBy()?) and implementation details (reuse window operator? local
> keyed state?).
> And the discussion is stalled and mixed with motivation and API and
> implementation discussion.
>
> In order to make some progress in this topic, I want to summarize the
> points (pls correct me if I'm wrong or missing sth) and would suggest to
> split
>  the topic into following aspects and discuss them one by one.
>
> 1) What's the main purpose of this FLIP?
>  - From the title of this FLIP, it is to support local aggregate. However
> from the content of the FLIP, 80% are introducing a new state called local
> keyed state.
>  - If we mainly want to introduce local keyed state, then we should
> re-title the FLIP and involve in more people who works on state.
>  - If we mainly want to support local aggregate, then we can jump to step 2
> to discuss the API design.
>
> 2) What does the API look like?
>  - Vino proposed to use "localKeyBy()" to do local process, the output of
> local process is the result type of aggregate function.
>   a) For non-windowed aggregate:
> input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2)    **NOT
> SUPPORT**
>   b) For windowed aggregate:
>
> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)
>
> 3) What's the implementation detail?
>  - may reuse window operator or not.
>  - may introduce a new state concepts or not.
>  - may not have state in local operator by flushing buffers in
> prepareSnapshotPreBarrier
>  - and so on...
>  - we can discuss these later when we reach a consensus on API
>
> --------------------
>
> Here are my thoughts:
>
> 1) Purpose of this FLIP
>  - From the motivation section in the FLIP, I think the purpose is to
> support local aggregation to solve the data skew issue.
>    Then I think we should focus on how to provide a easy to use and clear
> API to support **local aggregation**.
>  - Vino's point is centered around the local keyed state API (or
> localKeyBy()), and how to leverage the local keyed state API to support
> local aggregation.
>    But I'm afraid it's not a good way to design API for local aggregation.
>
> 2) local aggregation API
>  - IMO, the method call chain
>
> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)"
> is not such easy to use.
>    Because we have to provide two implementation for an aggregation (one
> for partial agg, another for final agg). And we have to take care of
>    the first window call, an inappropriate window call will break the
> sematics.
>  - From my point of view, local aggregation is a mature concept which
> should output the intermediate accumulator (ACC) in the past period of time
> (a trigger).
>    And the downstream final aggregation will merge ACCs received from local
> side, and output the current final result.
>  - The current "AggregateFunction" API in DataStream already has the
> accumulator type and "merge" method. So the only thing user need to do is
> how to enable
>    local aggregation opimization and set a trigger.
>  - One idea comes to my head is that, assume we have a windowed aggregation
> stream: "val stream = input.keyBy().window(w).aggregate(agg)". We can
> provide an API on the stream.
>    For exmaple, "stream.enableLocalAggregation(Trigger)", the trigger can
> be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))". Then it will
> be optmized into
>    local operator + final operator, and local operator will combine records
> every minute on event time.
>  - In this way, there is only one line added, and the output is the same
> with before, because it is just an opimization.
>
>
> Regards,
> Jark
>
>
>
> On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Kurt,
> >
> > Answer your questions:
> >
> > a) Sorry, I just updated the Google doc, still have no time update the
> > FLIP, will update FLIP as soon as possible.
> > About your description at this point, I have a question, what does it
> mean:
> > how do we combine with
> > `AggregateFunction`?
> >
> > I have shown you the examples which Flink has supported:
> >
> >    - input.localKeyBy(0).aggregate()
> >    - input.localKeyBy(0).window().aggregate()
> >
> > You can show me a example about how do we combine with `AggregateFuncion`
> > through your localAggregate API.
> >
> > About the example, how to do the local aggregation for AVG, consider this
> > code:
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *DataStream<Tuple2<String, Long>> source = null; source .localKeyBy(0)
> > .timeWindow(Time.seconds(60)) .aggregate(agg1, new
> > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, String,
> > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60))
> .aggregate(agg2,
> > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String,
> > TimeWindow>());*
> >
> > *agg1:*
> > *signature : new AggregateFunction<Tuple2<String, Long>, Tuple2<Long,
> > Long>, Tuple2<Long, Long>>() {}*
> > *input param type: Tuple2<String, Long> f0: key, f1: value*
> > *intermediate result type: Tuple2<Long, Long>, f0: local aggregated sum;
> > f1: local aggregated count*
> > *output param type:  Tuple2<Long, Long>, f0: local aggregated sum; f1:
> > local aggregated count*
> >
> > *agg2:*
> > *signature: new AggregateFunction<Tuple3<String, Long, Long>, Long,
> > Tuple2<String, Long>>() {},*
> > *input param type: Tuple3<String, Long, Long>, f0: key, f1:  local
> > aggregated sum; f2: local aggregated count*
> >
> > *intermediate result type: Long  avg result*
> > *output param type:  Tuple2<String, Long> f0: key, f1 avg result*
> >
> > For sliding window, we just need to change the window type if users want
> to
> > do.
> > Again, we try to give the design and implementation in the DataStream
> > level. So I believe we can match all the requirements(It's just that the
> > implementation may be different) comes from the SQL level.
> >
> > b) Yes, Theoretically, your thought is right. But in reality, it cannot
> > bring many benefits.
> > If we want to get the benefits from the window API, while we do not reuse
> > the window operator? And just copy some many duplicated code to another
> > operator?
> >
> > c) OK, I agree to let the state backend committers join this discussion.
> >
> > Best,
> > Vino
> >
> >
> > Kurt Young <ykt...@gmail.com> 于2019年6月24日周一 下午6:53写道:
> >
> > > Hi vino,
> > >
> > > One thing to add,  for a), I think use one or two examples like how to
> do
> > > local aggregation on a sliding window,
> > > and how do we do local aggregation on an unbounded aggregate, will do a
> > lot
> > > help.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com> wrote:
> > >
> > > > Hi vino,
> > > >
> > > > I think there are several things still need discussion.
> > > >
> > > > a) We all agree that we should first go with a unified abstraction,
> but
> > > > the abstraction is not reflected by the FLIP.
> > > > If your answer is "locakKeyBy" API, then I would ask how do we
> combine
> > > > with `AggregateFunction`, and how do
> > > > we do proper local aggregation for those have different intermediate
> > > > result type, like AVG. Could you add these
> > > > to the document?
> > > >
> > > > b) From implementation side, reusing window operator is one of the
> > > > possible solutions, but not we base on window
> > > > operator to have two different implementations. What I understanding
> > is,
> > > > one of the possible implementations should
> > > > not touch window operator.
> > > >
> > > > c) 80% of your FLIP content is actually describing how do we support
> > > local
> > > > keyed state. I don't know if this is necessary
> > > > to introduce at the first step and we should also involve committers
> > work
> > > > on state backend to share their thoughts.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <yanghua1...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Kurt,
> > > >>
> > > >> You did not give more further different opinions, so I thought you
> > have
> > > >> agreed with the design after we promised to support two kinds of
> > > >> implementation.
> > > >>
> > > >> In API level, we have answered your question about pass an
> > > >> AggregateFunction to do the aggregation. No matter introduce
> > localKeyBy
> > > >> API
> > > >> or not, we can support AggregateFunction.
> > > >>
> > > >> So what's your different opinion now? Can you share it with us?
> > > >>
> > > >> Best,
> > > >> Vino
> > > >>
> > > >> Kurt Young <ykt...@gmail.com> 于2019年6月24日周一 下午4:24写道:
> > > >>
> > > >> > Hi vino,
> > > >> >
> > > >> > Sorry I don't see the consensus about reusing window operator and
> > keep
> > > >> the
> > > >> > API design of localKeyBy. But I think we should definitely more
> > > thoughts
> > > >> > about this topic.
> > > >> >
> > > >> > I also try to loop in Stephan for this discussion.
> > > >> >
> > > >> > Best,
> > > >> > Kurt
> > > >> >
> > > >> >
> > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <yanghua1...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > I am happy we have a wonderful discussion and received many
> > valuable
> > > >> > > opinions in the last few days.
> > > >> > >
> > > >> > > Now, let me try to summarize what we have reached consensus
> about
> > > the
> > > >> > > changes in the design.
> > > >> > >
> > > >> > >    - provide a unified abstraction to support two kinds of
> > > >> > implementation;
> > > >> > >    - reuse WindowOperator and try to enhance it so that we can
> > make
> > > >> the
> > > >> > >    intermediate result of the local aggregation can be buffered
> > and
> > > >> > > flushed to
> > > >> > >    support two kinds of implementation;
> > > >> > >    - keep the API design of localKeyBy, but declare the disabled
> > > some
> > > >> > APIs
> > > >> > >    we cannot support currently, and provide a configurable API
> for
> > > >> users
> > > >> > to
> > > >> > >    choose how to handle intermediate result;
> > > >> > >
> > > >> > > The above three points have been updated in the design doc. Any
> > > >> > > questions, please let me know.
> > > >> > >
> > > >> > > @Aljoscha Krettek <aljos...@apache.org> What do you think? Any
> > > >> further
> > > >> > > comments?
> > > >> > >
> > > >> > > Best,
> > > >> > > Vino
> > > >> > >
> > > >> > > vino yang <yanghua1...@gmail.com> 于2019年6月20日周四 下午2:02写道:
> > > >> > >
> > > >> > > > Hi Kurt,
> > > >> > > >
> > > >> > > > Thanks for your comments.
> > > >> > > >
> > > >> > > > It seems we come to a consensus that we should alleviate the
> > > >> > performance
> > > >> > > > degraded by data skew with local aggregation. In this FLIP,
> our
> > > key
> > > >> > > > solution is to introduce local keyed partition to achieve this
> > > goal.
> > > >> > > >
> > > >> > > > I also agree that we can benefit a lot from the usage of
> > > >> > > > AggregateFunction. In combination with localKeyBy, We can
> easily
> > > >> use it
> > > >> > > to
> > > >> > > > achieve local aggregation:
> > > >> > > >
> > > >> > > >    - input.localKeyBy(0).aggregate()
> > > >> > > >    - input.localKeyBy(0).window().aggregate()
> > > >> > > >
> > > >> > > >
> > > >> > > > I think the only problem here is the choices between
> > > >> > > >
> > > >> > > >    - (1) Introducing a new primitive called localKeyBy and
> > > implement
> > > >> > > >    local aggregation with existing operators, or
> > > >> > > >    - (2) Introducing an operator called localAggregation which
> > is
> > > >> > > >    composed of a key selector, a window-like operator, and an
> > > >> aggregate
> > > >> > > >    function.
> > > >> > > >
> > > >> > > >
> > > >> > > > There may exist some optimization opportunities by providing a
> > > >> > composited
> > > >> > > > interface for local aggregation. But at the same time, in my
> > > >> opinion,
> > > >> > we
> > > >> > > > lose flexibility (Or we need certain efforts to achieve the
> same
> > > >> > > > flexibility).
> > > >> > > >
> > > >> > > > As said in the previous mails, we have many use cases where
> the
> > > >> > > > aggregation is very complicated and cannot be performed with
> > > >> > > > AggregateFunction. For example, users may perform windowed
> > > >> aggregations
> > > >> > > > according to time, data values, or even external storage.
> > > Typically,
> > > >> > they
> > > >> > > > now use KeyedProcessFunction or customized triggers to
> implement
> > > >> these
> > > >> > > > aggregations. It's not easy to address data skew in such cases
> > > with
> > > >> a
> > > >> > > > composited interface for local aggregation.
> > > >> > > >
> > > >> > > > Given that Data Stream API is exactly targeted at these cases
> > > where
> > > >> the
> > > >> > > > application logic is very complicated and optimization does
> not
> > > >> > matter, I
> > > >> > > > think it's a better choice to provide a relatively low-level
> and
> > > >> > > canonical
> > > >> > > > interface.
> > > >> > > >
> > > >> > > > The composited interface, on the other side, may be a good
> > choice
> > > in
> > > >> > > > declarative interfaces, including SQL and Table API, as it
> > allows
> > > >> more
> > > >> > > > optimization opportunities.
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Vino
> > > >> > > >
> > > >> > > >
> > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道:
> > > >> > > >
> > > >> > > >> Hi all,
> > > >> > > >>
> > > >> > > >> As vino said in previous emails, I think we should first
> > discuss
> > > >> and
> > > >> > > >> decide
> > > >> > > >> what kind of use cases this FLIP want to
> > > >> > > >> resolve, and what the API should look like. From my side, I
> > think
> > > >> this
> > > >> > > is
> > > >> > > >> probably the root cause of current divergence.
> > > >> > > >>
> > > >> > > >> My understand is (from the FLIP title and motivation section
> of
> > > the
> > > >> > > >> document), we want to have a proper support of
> > > >> > > >> local aggregation, or pre aggregation. This is not a very new
> > > idea,
> > > >> > most
> > > >> > > >> SQL engine already did this improvement. And
> > > >> > > >> the core concept about this is, there should be an
> > > >> AggregateFunction,
> > > >> > no
> > > >> > > >> matter it's a Flink runtime's AggregateFunction or
> > > >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation have
> > concept
> > > >> of
> > > >> > > >> intermediate data type, sometimes we call it ACC.
> > > >> > > >> I quickly went through the POC piotr did before [1], it also
> > > >> directly
> > > >> > > uses
> > > >> > > >> AggregateFunction.
> > > >> > > >>
> > > >> > > >> But the thing is, after reading the design of this FLIP, I
> > can't
> > > >> help
> > > >> > > >> myself feeling that this FLIP is not targeting to have a
> proper
> > > >> > > >> local aggregation support. It actually want to introduce
> > another
> > > >> > > concept:
> > > >> > > >> LocalKeyBy, and how to split and merge local key groups,
> > > >> > > >> and how to properly support state on local key. Local
> > aggregation
> > > >> just
> > > >> > > >> happened to be one possible use case of LocalKeyBy.
> > > >> > > >> But it lacks supporting the essential concept of local
> > > aggregation,
> > > >> > > which
> > > >> > > >> is intermediate data type. Without this, I really don't thing
> > > >> > > >> it is a good fit of local aggregation.
> > > >> > > >>
> > > >> > > >> Here I want to make sure of the scope or the goal about this
> > > FLIP,
> > > >> do
> > > >> > we
> > > >> > > >> want to have a proper local aggregation engine, or we
> > > >> > > >> just want to introduce a new concept called LocalKeyBy?
> > > >> > > >>
> > > >> > > >> [1]: https://github.com/apache/flink/pull/4626
> > > >> > > >>
> > > >> > > >> Best,
> > > >> > > >> Kurt
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <
> > yanghua1...@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > >>
> > > >> > > >> > Hi Hequn,
> > > >> > > >> >
> > > >> > > >> > Thanks for your comments!
> > > >> > > >> >
> > > >> > > >> > I agree that allowing local aggregation reusing window API
> > and
> > > >> > > refining
> > > >> > > >> > window operator to make it match both requirements (come
> from
> > > our
> > > >> > and
> > > >> > > >> Kurt)
> > > >> > > >> > is a good decision!
> > > >> > > >> >
> > > >> > > >> > Concerning your questions:
> > > >> > > >> >
> > > >> > > >> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > may
> > > >> be
> > > >> > > >> > meaningless.
> > > >> > > >> >
> > > >> > > >> > Yes, it does not make sense in most cases. However, I also
> > want
> > > >> to
> > > >> > > note
> > > >> > > >> > users should know the right semantics of localKeyBy and use
> > it
> > > >> > > >> correctly.
> > > >> > > >> > Because this issue also exists for the global keyBy,
> consider
> > > >> this
> > > >> > > >> example:
> > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also
> > > >> > meaningless.
> > > >> > > >> >
> > > >> > > >> > 2. About the semantics of
> > > >> > > >> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
> > > >> > > >> >
> > > >> > > >> > Good catch! I agree with you that it's not good to enable
> all
> > > >> > > >> > functionalities for localKeyBy from KeyedStream.
> > > >> > > >> > Currently, We do not support some APIs such as
> > > >> > > >> > connect/join/intervalJoin/coGroup. This is due to that we
> > force
> > > >> the
> > > >> > > >> > operators on LocalKeyedStreams chained with the inputs.
> > > >> > > >> >
> > > >> > > >> > Best,
> > > >> > > >> > Vino
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道:
> > > >> > > >> >
> > > >> > > >> > > Hi,
> > > >> > > >> > >
> > > >> > > >> > > Thanks a lot for your great discussion and great to see
> > that
> > > >> some
> > > >> > > >> > agreement
> > > >> > > >> > > has been reached on the "local aggregate engine"!
> > > >> > > >> > >
> > > >> > > >> > > ===> Considering the abstract engine,
> > > >> > > >> > > I'm thinking is it valuable for us to extend the current
> > > >> window to
> > > >> > > >> meet
> > > >> > > >> > > both demands raised by Kurt and Vino? There are some
> > benefits
> > > >> we
> > > >> > can
> > > >> > > >> get:
> > > >> > > >> > >
> > > >> > > >> > > 1. The interfaces of the window are complete and clear.
> > With
> > > >> > > windows,
> > > >> > > >> we
> > > >> > > >> > > can define a lot of ways to split the data and perform
> > > >> different
> > > >> > > >> > > computations.
> > > >> > > >> > > 2. We can also leverage the window to do miniBatch for
> the
> > > >> global
> > > >> > > >> > > aggregation, i.e, we can use the window to bundle data
> > belong
> > > >> to
> > > >> > the
> > > >> > > >> same
> > > >> > > >> > > key, for every bundle we only need to read and write once
> > > >> state.
> > > >> > > This
> > > >> > > >> can
> > > >> > > >> > > greatly reduce state IO and improve performance.
> > > >> > > >> > > 3. A lot of other use cases can also benefit from the
> > window
> > > >> base
> > > >> > on
> > > >> > > >> > memory
> > > >> > > >> > > or stateless.
> > > >> > > >> > >
> > > >> > > >> > > ===> As for the API,
> > > >> > > >> > > I think it is good to make our API more flexible.
> However,
> > we
> > > >> may
> > > >> > > >> need to
> > > >> > > >> > > make our API meaningful.
> > > >> > > >> > >
> > > >> > > >> > > Take my previous reply as an example,
> > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result
> may
> > be
> > > >> > > >> > meaningless.
> > > >> > > >> > > Another example I find is the intervalJoin, e.g.,
> > > >> > > >> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
> In
> > > >> this
> > > >> > > >> case, it
> > > >> > > >> > > will bring problems if input1 and input2 share different
> > > >> > > parallelism.
> > > >> > > >> We
> > > >> > > >> > > don't know which input should the join chained with? Even
> > if
> > > >> they
> > > >> > > >> share
> > > >> > > >> > the
> > > >> > > >> > > same parallelism, it's hard to tell what the join is
> doing.
> > > >> There
> > > >> > > are
> > > >> > > >> > maybe
> > > >> > > >> > > some other problems.
> > > >> > > >> > >
> > > >> > > >> > > From this point of view, it's at least not good to enable
> > all
> > > >> > > >> > > functionalities for localKeyBy from KeyedStream?
> > > >> > > >> > >
> > > >> > > >> > > Great to also have your opinions.
> > > >> > > >> > >
> > > >> > > >> > > Best, Hequn
> > > >> > > >> > >
> > > >> > > >> > >
> > > >> > > >> > >
> > > >> > > >> > >
> > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <
> > > >> yanghua1...@gmail.com
> > > >> > >
> > > >> > > >> > wrote:
> > > >> > > >> > >
> > > >> > > >> > > > Hi Kurt and Piotrek,
> > > >> > > >> > > >
> > > >> > > >> > > > Thanks for your comments.
> > > >> > > >> > > >
> > > >> > > >> > > > I agree that we can provide a better abstraction to be
> > > >> > compatible
> > > >> > > >> with
> > > >> > > >> > > two
> > > >> > > >> > > > different implementations.
> > > >> > > >> > > >
> > > >> > > >> > > > First of all, I think we should consider what kind of
> > > >> scenarios
> > > >> > we
> > > >> > > >> need
> > > >> > > >> > > to
> > > >> > > >> > > > support in *API* level?
> > > >> > > >> > > >
> > > >> > > >> > > > We have some use cases which need to a customized
> > > aggregation
> > > >> > > >> through
> > > >> > > >> > > > KeyedProcessFunction, (in the usage of our
> > > localKeyBy.window
> > > >> > they
> > > >> > > >> can
> > > >> > > >> > use
> > > >> > > >> > > > ProcessWindowFunction).
> > > >> > > >> > > >
> > > >> > > >> > > > Shall we support these flexible use scenarios?
> > > >> > > >> > > >
> > > >> > > >> > > > Best,
> > > >> > > >> > > > Vino
> > > >> > > >> > > >
> > > >> > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道:
> > > >> > > >> > > >
> > > >> > > >> > > > > Hi Piotr,
> > > >> > > >> > > > >
> > > >> > > >> > > > > Thanks for joining the discussion. Make “local
> > > aggregation"
> > > >> > > >> abstract
> > > >> > > >> > > > enough
> > > >> > > >> > > > > sounds good to me, we could
> > > >> > > >> > > > > implement and verify alternative solutions for use
> > cases
> > > of
> > > >> > > local
> > > >> > > >> > > > > aggregation. Maybe we will find both solutions
> > > >> > > >> > > > > are appropriate for different scenarios.
> > > >> > > >> > > > >
> > > >> > > >> > > > > Starting from a simple one sounds a practical way to
> > go.
> > > >> What
> > > >> > do
> > > >> > > >> you
> > > >> > > >> > > > think,
> > > >> > > >> > > > > vino?
> > > >> > > >> > > > >
> > > >> > > >> > > > > Best,
> > > >> > > >> > > > > Kurt
> > > >> > > >> > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
> > > >> > > >> pi...@ververica.com>
> > > >> > > >> > > > > wrote:
> > > >> > > >> > > > >
> > > >> > > >> > > > > > Hi Kurt and Vino,
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > I think there is a trade of hat we need to consider
> > for
> > > >> the
> > > >> > > >> local
> > > >> > > >> > > > > > aggregation.
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > Generally speaking I would agree with Kurt about
> > local
> > > >> > > >> > > aggregation/pre
> > > >> > > >> > > > > > aggregation not using Flink's state flush the
> > operator
> > > >> on a
> > > >> > > >> > > checkpoint.
> > > >> > > >> > > > > > Network IO is usually cheaper compared to Disks IO.
> > > This
> > > >> has
> > > >> > > >> > however
> > > >> > > >> > > > > couple
> > > >> > > >> > > > > > of issues:
> > > >> > > >> > > > > > 1. It can explode number of in-flight records
> during
> > > >> > > checkpoint
> > > >> > > >> > > barrier
> > > >> > > >> > > > > > alignment, making checkpointing slower and decrease
> > the
> > > >> > actual
> > > >> > > >> > > > > throughput.
> > > >> > > >> > > > > > 2. This trades Disks IO on the local aggregation
> > > machine
> > > >> > with
> > > >> > > >> CPU
> > > >> > > >> > > (and
> > > >> > > >> > > > > > Disks IO in case of RocksDB) on the final
> aggregation
> > > >> > machine.
> > > >> > > >> This
> > > >> > > >> > > is
> > > >> > > >> > > > > > fine, as long there is no huge data skew. If there
> is
> > > >> only a
> > > >> > > >> > handful
> > > >> > > >> > > > (or
> > > >> > > >> > > > > > even one single) hot keys, it might be better to
> keep
> > > the
> > > >> > > >> > persistent
> > > >> > > >> > > > > state
> > > >> > > >> > > > > > in the LocalAggregationOperator to offload final
> > > >> aggregation
> > > >> > > as
> > > >> > > >> > much
> > > >> > > >> > > as
> > > >> > > >> > > > > > possible.
> > > >> > > >> > > > > > 3. With frequent checkpointing local aggregation
> > > >> > effectiveness
> > > >> > > >> > would
> > > >> > > >> > > > > > degrade.
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > I assume Kurt is correct, that in your use cases
> > > >> stateless
> > > >> > > >> operator
> > > >> > > >> > > was
> > > >> > > >> > > > > > behaving better, but I could easily see other use
> > cases
> > > >> as
> > > >> > > well.
> > > >> > > >> > For
> > > >> > > >> > > > > > example someone is already using RocksDB, and his
> job
> > > is
> > > >> > > >> > bottlenecked
> > > >> > > >> > > > on
> > > >> > > >> > > > > a
> > > >> > > >> > > > > > single window operator instance because of the data
> > > >> skew. In
> > > >> > > >> that
> > > >> > > >> > > case
> > > >> > > >> > > > > > stateful local aggregation would be probably a
> better
> > > >> > choice.
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > Because of that, I think we should eventually
> provide
> > > >> both
> > > >> > > >> versions
> > > >> > > >> > > and
> > > >> > > >> > > > > in
> > > >> > > >> > > > > > the initial version we should at least make the
> > “local
> > > >> > > >> aggregation
> > > >> > > >> > > > > engine”
> > > >> > > >> > > > > > abstract enough, that one could easily provide
> > > different
> > > >> > > >> > > implementation
> > > >> > > >> > > > > > strategy.
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > Piotrek
> > > >> > > >> > > > > >
> > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <
> > > ykt...@gmail.com
> > > >> >
> > > >> > > >> wrote:
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > Hi,
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > For the trigger, it depends on what operator we
> > want
> > > to
> > > >> > use
> > > >> > > >> under
> > > >> > > >> > > the
> > > >> > > >> > > > > > API.
> > > >> > > >> > > > > > > If we choose to use window operator,
> > > >> > > >> > > > > > > we should also use window's trigger. However, I
> > also
> > > >> think
> > > >> > > >> reuse
> > > >> > > >> > > > window
> > > >> > > >> > > > > > > operator for this scenario may not be
> > > >> > > >> > > > > > > the best choice. The reasons are the following:
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > 1. As a lot of people already pointed out, window
> > > >> relies
> > > >> > > >> heavily
> > > >> > > >> > on
> > > >> > > >> > > > > state
> > > >> > > >> > > > > > > and it will definitely effect performance. You
> can
> > > >> > > >> > > > > > > argue that one can use heap based statebackend,
> but
> > > >> this
> > > >> > > will
> > > >> > > >> > > > introduce
> > > >> > > >> > > > > > > extra coupling. Especially we have a chance to
> > > >> > > >> > > > > > > design a pure stateless operator.
> > > >> > > >> > > > > > > 2. The window operator is *the most* complicated
> > > >> operator
> > > >> > > >> Flink
> > > >> > > >> > > > > currently
> > > >> > > >> > > > > > > have. Maybe we only need to pick a subset of
> > > >> > > >> > > > > > > window operator to achieve the goal, but once the
> > > user
> > > >> > wants
> > > >> > > >> to
> > > >> > > >> > > have
> > > >> > > >> > > > a
> > > >> > > >> > > > > > deep
> > > >> > > >> > > > > > > look at the localAggregation operator, it's still
> > > >> > > >> > > > > > > hard to find out what's going on under the window
> > > >> > operator.
> > > >> > > >> For
> > > >> > > >> > > > > > simplicity,
> > > >> > > >> > > > > > > I would also recommend we introduce a dedicated
> > > >> > > >> > > > > > > lightweight operator, which also much easier for
> a
> > > >> user to
> > > >> > > >> learn
> > > >> > > >> > > and
> > > >> > > >> > > > > use.
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > For your question about increasing the burden in
> > > >> > > >> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`,
> the
> > > only
> > > >> > > thing
> > > >> > > >> > this
> > > >> > > >> > > > > > function
> > > >> > > >> > > > > > > need
> > > >> > > >> > > > > > > to do is output all the partial results, it's
> > purely
> > > >> cpu
> > > >> > > >> > workload,
> > > >> > > >> > > > not
> > > >> > > >> > > > > > > introducing any IO. I want to point out that even
> > if
> > > we
> > > >> > have
> > > >> > > >> this
> > > >> > > >> > > > > > > cost, we reduced another barrier align cost of
> the
> > > >> > operator,
> > > >> > > >> > which
> > > >> > > >> > > is
> > > >> > > >> > > > > the
> > > >> > > >> > > > > > > sync flush stage of the state, if you introduced
> > > state.
> > > >> > This
> > > >> > > >> > > > > > > flush actually will introduce disk IO, and I
> think
> > > it's
> > > >> > > >> worthy to
> > > >> > > >> > > > > > exchange
> > > >> > > >> > > > > > > this cost with purely CPU workload. And we do
> have
> > > some
> > > >> > > >> > > > > > > observations about these two behavior (as i said
> > > >> before,
> > > >> > we
> > > >> > > >> > > actually
> > > >> > > >> > > > > > > implemented both solutions), the stateless one
> > > actually
> > > >> > > >> performs
> > > >> > > >> > > > > > > better both in performance and barrier align
> time.
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > Best,
> > > >> > > >> > > > > > > Kurt
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
> > > >> > > >> yanghua1...@gmail.com
> > > >> > > >> > >
> > > >> > > >> > > > > wrote:
> > > >> > > >> > > > > > >
> > > >> > > >> > > > > > >> Hi Kurt,
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> Thanks for your example. Now, it looks more
> > clearly
> > > >> for
> > > >> > me.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> From your example code snippet, I saw the
> > > >> localAggregate
> > > >> > > API
> > > >> > > >> has
> > > >> > > >> > > > three
> > > >> > > >> > > > > > >> parameters:
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >>   1. key field
> > > >> > > >> > > > > > >>   2. PartitionAvg
> > > >> > > >> > > > > > >>   3. CountTrigger: Does this trigger comes from
> > > window
> > > >> > > >> package?
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> I will compare our and your design from API and
> > > >> operator
> > > >> > > >> level:
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> *From the API level:*
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> As I replied to @dianfu in the old email
> > thread,[1]
> > > >> the
> > > >> > > >> Window
> > > >> > > >> > API
> > > >> > > >> > > > can
> > > >> > > >> > > > > > >> provide the second and the third parameter right
> > > now.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> If you reuse specified interface or class, such
> as
> > > >> > > *Trigger*
> > > >> > > >> or
> > > >> > > >> > > > > > >> *CounterTrigger* provided by window package, but
> > do
> > > >> not
> > > >> > use
> > > >> > > >> > window
> > > >> > > >> > > > > API,
> > > >> > > >> > > > > > >> it's not reasonable.
> > > >> > > >> > > > > > >> And if you do not reuse these interface or
> class,
> > > you
> > > >> > would
> > > >> > > >> need
> > > >> > > >> > > to
> > > >> > > >> > > > > > >> introduce more things however they are looked
> > > similar
> > > >> to
> > > >> > > the
> > > >> > > >> > > things
> > > >> > > >> > > > > > >> provided by window package.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> The window package has provided several types of
> > the
> > > >> > window
> > > >> > > >> and
> > > >> > > >> > > many
> > > >> > > >> > > > > > >> triggers and let users customize it. What's
> more,
> > > the
> > > >> > user
> > > >> > > is
> > > >> > > >> > more
> > > >> > > >> > > > > > familiar
> > > >> > > >> > > > > > >> with Window API.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> This is the reason why we just provide
> localKeyBy
> > > API
> > > >> and
> > > >> > > >> reuse
> > > >> > > >> > > the
> > > >> > > >> > > > > > window
> > > >> > > >> > > > > > >> API. It reduces unnecessary components such as
> > > >> triggers
> > > >> > and
> > > >> > > >> the
> > > >> > > >> > > > > > mechanism
> > > >> > > >> > > > > > >> of buffer (based on count num or time).
> > > >> > > >> > > > > > >> And it has a clear and easy to understand
> > semantics.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> *From the operator level:*
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> We reused window operator, so we can get all the
> > > >> benefits
> > > >> > > >> from
> > > >> > > >> > > state
> > > >> > > >> > > > > and
> > > >> > > >> > > > > > >> checkpoint.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> From your design, you named the operator under
> > > >> > > localAggregate
> > > >> > > >> > API
> > > >> > > >> > > > is a
> > > >> > > >> > > > > > >> *stateless* operator. IMO, it is still a state,
> it
> > > is
> > > >> > just
> > > >> > > >> not
> > > >> > > >> > > Flink
> > > >> > > >> > > > > > >> managed state.
> > > >> > > >> > > > > > >> About the memory buffer (I think it's still not
> > very
> > > >> > clear,
> > > >> > > >> if
> > > >> > > >> > you
> > > >> > > >> > > > > have
> > > >> > > >> > > > > > >> time, can you give more detail information or
> > answer
> > > >> my
> > > >> > > >> > > questions),
> > > >> > > >> > > > I
> > > >> > > >> > > > > > have
> > > >> > > >> > > > > > >> some questions:
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >>   - if it just a raw JVM heap memory buffer, how
> > to
> > > >> > support
> > > >> > > >> > fault
> > > >> > > >> > > > > > >>   tolerance, if the job is configured
> EXACTLY-ONCE
> > > >> > semantic
> > > >> > > >> > > > guarantee?
> > > >> > > >> > > > > > >>   - if you thought the memory buffer(non-Flink
> > > state),
> > > >> > has
> > > >> > > >> > better
> > > >> > > >> > > > > > >>   performance. In our design, users can also
> > config
> > > >> HEAP
> > > >> > > >> state
> > > >> > > >> > > > backend
> > > >> > > >> > > > > > to
> > > >> > > >> > > > > > >>   provide the performance close to your
> mechanism.
> > > >> > > >> > > > > > >>   -
> `StreamOperator::prepareSnapshotPreBarrier()`
> > > >> related
> > > >> > > to
> > > >> > > >> the
> > > >> > > >> > > > > timing
> > > >> > > >> > > > > > of
> > > >> > > >> > > > > > >>   snapshot. IMO, the flush action should be a
> > > >> > synchronized
> > > >> > > >> > action?
> > > >> > > >> > > > (if
> > > >> > > >> > > > > > >> not,
> > > >> > > >> > > > > > >>   please point out my mistake) I still think we
> > > should
> > > >> > not
> > > >> > > >> > depend
> > > >> > > >> > > on
> > > >> > > >> > > > > the
> > > >> > > >> > > > > > >>   timing of checkpoint. Checkpoint related
> > > operations
> > > >> are
> > > >> > > >> > inherent
> > > >> > > >> > > > > > >>   performance sensitive, we should not increase
> > its
> > > >> > burden
> > > >> > > >> > > anymore.
> > > >> > > >> > > > > Our
> > > >> > > >> > > > > > >>   implementation based on the mechanism of
> Flink's
> > > >> > > >> checkpoint,
> > > >> > > >> > > which
> > > >> > > >> > > > > can
> > > >> > > >> > > > > > >>   benefit from the asnyc snapshot and
> incremental
> > > >> > > checkpoint.
> > > >> > > >> > IMO,
> > > >> > > >> > > > the
> > > >> > > >> > > > > > >>   performance is not a problem, and we also do
> not
> > > >> find
> > > >> > the
> > > >> > > >> > > > > performance
> > > >> > > >> > > > > > >> issue
> > > >> > > >> > > > > > >>   in our production.
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> [1]:
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> Best,
> > > >> > > >> > > > > > >> Vino
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二
> > > 下午2:27写道:
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > > >>> Yeah, sorry for not expressing myself clearly.
> I
> > > will
> > > >> > try
> > > >> > > to
> > > >> > > >> > > > provide
> > > >> > > >> > > > > > more
> > > >> > > >> > > > > > >>> details to make sure we are on the same page.
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be optimized
> > > >> > > automatically.
> > > >> > > >> > You
> > > >> > > >> > > > have
> > > >> > > >> > > > > > to
> > > >> > > >> > > > > > >>> explicitly call API to do local aggregation
> > > >> > > >> > > > > > >>> as well as the trigger policy of the local
> > > >> aggregation.
> > > >> > > Take
> > > >> > > >> > > > average
> > > >> > > >> > > > > > for
> > > >> > > >> > > > > > >>> example, the user program may look like this
> > (just
> > > a
> > > >> > > draft):
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> assuming the input type is
> > DataStream<Tupl2<String,
> > > >> > Int>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> ds.localAggregate(
> > > >> > > >> > > > > > >>>        0,
> >  //
> > > >> The
> > > >> > > local
> > > >> > > >> > key,
> > > >> > > >> > > > > which
> > > >> > > >> > > > > > >> is
> > > >> > > >> > > > > > >>> the String from Tuple2
> > > >> > > >> > > > > > >>>        PartitionAvg(1),                 // The
> > > >> partial
> > > >> > > >> > > aggregation
> > > >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>,
> indicating
> > > sum
> > > >> and
> > > >> > > >> count
> > > >> > > >> > > > > > >>>        CountTrigger.of(1000L)    // Trigger
> > policy,
> > > >> note
> > > >> > > >> this
> > > >> > > >> > > > should
> > > >> > > >> > > > > be
> > > >> > > >> > > > > > >>> best effort, and also be composited with time
> > based
> > > >> or
> > > >> > > >> memory
> > > >> > > >> > > size
> > > >> > > >> > > > > > based
> > > >> > > >> > > > > > >>> trigger
> > > >> > > >> > > > > > >>>    )
>  //
> > > The
> > > >> > > return
> > > >> > > >> > type
> > > >> > > >> > > > is
> > > >> > > >> > > > > > >> local
> > > >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
> > > >> > > >> > > > > > >>>    .keyBy(0)                             //
> > Further
> > > >> > keyby
> > > >> > > it
> > > >> > > >> > with
> > > >> > > >> > > > > > >> required
> > > >> > > >> > > > > > >>> key
> > > >> > > >> > > > > > >>>    .aggregate(1)                      // This
> > will
> > > >> merge
> > > >> > > all
> > > >> > > >> > the
> > > >> > > >> > > > > > partial
> > > >> > > >> > > > > > >>> results and get the final average.
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> (This is only a draft, only trying to explain
> > what
> > > it
> > > >> > > looks
> > > >> > > >> > > like. )
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> The local aggregate operator can be stateless,
> we
> > > can
> > > >> > > keep a
> > > >> > > >> > > memory
> > > >> > > >> > > > > > >> buffer
> > > >> > > >> > > > > > >>> or other efficient data structure to improve
> the
> > > >> > aggregate
> > > >> > > >> > > > > performance.
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> Let me know if you have any other questions.
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> Best,
> > > >> > > >> > > > > > >>> Kurt
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <
> > > >> > > >> > yanghua1...@gmail.com
> > > >> > > >> > > >
> > > >> > > >> > > > > > wrote:
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>>> Hi Kurt,
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> Thanks for your reply.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> Actually, I am not against you to raise your
> > > design.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> From your description before, I just can
> imagine
> > > >> your
> > > >> > > >> > high-level
> > > >> > > >> > > > > > >>>> implementation is about SQL and the
> optimization
> > > is
> > > >> > inner
> > > >> > > >> of
> > > >> > > >> > the
> > > >> > > >> > > > > API.
> > > >> > > >> > > > > > >> Is
> > > >> > > >> > > > > > >>> it
> > > >> > > >> > > > > > >>>> automatically? how to give the configuration
> > > option
> > > >> > about
> > > >> > > >> > > trigger
> > > >> > > >> > > > > > >>>> pre-aggregation?
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> Maybe after I get more information, it sounds
> > more
> > > >> > > >> reasonable.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> IMO, first of all, it would be better to make
> > your
> > > >> user
> > > >> > > >> > > interface
> > > >> > > >> > > > > > >>> concrete,
> > > >> > > >> > > > > > >>>> it's the basis of the discussion.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> For example, can you give an example code
> > snippet
> > > to
> > > >> > > >> introduce
> > > >> > > >> > > how
> > > >> > > >> > > > > to
> > > >> > > >> > > > > > >>> help
> > > >> > > >> > > > > > >>>> users to process data skew caused by the jobs
> > > which
> > > >> > built
> > > >> > > >> with
> > > >> > > >> > > > > > >> DataStream
> > > >> > > >> > > > > > >>>> API?
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> If you give more details we can discuss
> further
> > > >> more. I
> > > >> > > >> think
> > > >> > > >> > if
> > > >> > > >> > > > one
> > > >> > > >> > > > > > >>> design
> > > >> > > >> > > > > > >>>> introduces an exact interface and another does
> > > not.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> The implementation has an obvious difference.
> > For
> > > >> > > example,
> > > >> > > >> we
> > > >> > > >> > > > > > introduce
> > > >> > > >> > > > > > >>> an
> > > >> > > >> > > > > > >>>> exact API in DataStream named localKeyBy,
> about
> > > the
> > > >> > > >> > > > pre-aggregation
> > > >> > > >> > > > > we
> > > >> > > >> > > > > > >>> need
> > > >> > > >> > > > > > >>>> to define the trigger mechanism of local
> > > >> aggregation,
> > > >> > so
> > > >> > > we
> > > >> > > >> > find
> > > >> > > >> > > > > > reused
> > > >> > > >> > > > > > >>>> window API and operator is a good choice. This
> > is
> > > a
> > > >> > > >> reasoning
> > > >> > > >> > > link
> > > >> > > >> > > > > > from
> > > >> > > >> > > > > > >>>> design to implementation.
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> What do you think?
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> Best,
> > > >> > > >> > > > > > >>>> Vino
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二
> > > >> 上午11:58写道:
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>>> Hi Vino,
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> Now I feel that we may have different
> > > >> understandings
> > > >> > > about
> > > >> > > >> > what
> > > >> > > >> > > > > kind
> > > >> > > >> > > > > > >> of
> > > >> > > >> > > > > > >>>>> problems or improvements you want to
> > > >> > > >> > > > > > >>>>> resolve. Currently, most of the feedback are
> > > >> focusing
> > > >> > on
> > > >> > > >> *how
> > > >> > > >> > > to
> > > >> > > >> > > > > do a
> > > >> > > >> > > > > > >>>>> proper local aggregation to improve
> performance
> > > >> > > >> > > > > > >>>>> and maybe solving the data skew issue*. And
> my
> > > gut
> > > >> > > >> feeling is
> > > >> > > >> > > > this
> > > >> > > >> > > > > is
> > > >> > > >> > > > > > >>>>> exactly what users want at the first place,
> > > >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to
> > summarize
> > > >> here,
> > > >> > > >> please
> > > >> > > >> > > > > correct
> > > >> > > >> > > > > > >>> me
> > > >> > > >> > > > > > >>>> if
> > > >> > > >> > > > > > >>>>> i'm wrong).
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> But I still think the design is somehow
> > diverged
> > > >> from
> > > >> > > the
> > > >> > > >> > goal.
> > > >> > > >> > > > If
> > > >> > > >> > > > > we
> > > >> > > >> > > > > > >>>> want
> > > >> > > >> > > > > > >>>>> to have an efficient and powerful way to
> > > >> > > >> > > > > > >>>>> have local aggregation, supporting intermedia
> > > >> result
> > > >> > > type
> > > >> > > >> is
> > > >> > > >> > > > > > >> essential
> > > >> > > >> > > > > > >>>> IMO.
> > > >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and
> > > >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a
> > > proper
> > > >> > > >> support of
> > > >> > > >> > > > > > >>>> intermediate
> > > >> > > >> > > > > > >>>>> result type and can do `merge` operation
> > > >> > > >> > > > > > >>>>> on them.
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives which
> > > >> performs
> > > >> > > >> well,
> > > >> > > >> > > and
> > > >> > > >> > > > > > >> have a
> > > >> > > >> > > > > > >>>>> nice fit with the local aggregate
> requirements.
> > > >> > > >> > > > > > >>>>> Mostly importantly,  it's much less complex
> > > because
> > > >> > it's
> > > >> > > >> > > > stateless.
> > > >> > > >> > > > > > >> And
> > > >> > > >> > > > > > >>>> it
> > > >> > > >> > > > > > >>>>> can also achieve the similar
> > multiple-aggregation
> > > >> > > >> > > > > > >>>>> scenario.
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> I still not convinced why we shouldn't
> consider
> > > it
> > > >> as
> > > >> > a
> > > >> > > >> first
> > > >> > > >> > > > step.
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> Best,
> > > >> > > >> > > > > > >>>>> Kurt
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <
> > > >> > > >> > > > yanghua1...@gmail.com>
> > > >> > > >> > > > > > >>>> wrote:
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>>> Hi Kurt,
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Thanks for your comments.
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> It seems we both implemented local
> aggregation
> > > >> > feature
> > > >> > > to
> > > >> > > >> > > > optimize
> > > >> > > >> > > > > > >>> the
> > > >> > > >> > > > > > >>>>>> issue of data skew.
> > > >> > > >> > > > > > >>>>>> However, IMHO, the API level of optimizing
> > > >> revenue is
> > > >> > > >> > > different.
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink SQL
> and
> > > >> it's
> > > >> > not
> > > >> > > >> > user's
> > > >> > > >> > > > > > >>>> faces.(If
> > > >> > > >> > > > > > >>>>> I
> > > >> > > >> > > > > > >>>>>> understand it incorrectly, please correct
> > > this.)*
> > > >> > > >> > > > > > >>>>>> *Our implementation employs it as an
> > > optimization
> > > >> > tool
> > > >> > > >> API
> > > >> > > >> > for
> > > >> > > >> > > > > > >>>>> DataStream,
> > > >> > > >> > > > > > >>>>>> it just like a local version of the keyBy
> > API.*
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Based on this, I want to say support it as a
> > > >> > DataStream
> > > >> > > >> API
> > > >> > > >> > > can
> > > >> > > >> > > > > > >>> provide
> > > >> > > >> > > > > > >>>>>> these advantages:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>>   - The localKeyBy API has a clear semantic
> > and
> > > >> it's
> > > >> > > >> > flexible
> > > >> > > >> > > > not
> > > >> > > >> > > > > > >>> only
> > > >> > > >> > > > > > >>>>> for
> > > >> > > >> > > > > > >>>>>>   processing data skew but also for
> > implementing
> > > >> some
> > > >> > > >> user
> > > >> > > >> > > > cases,
> > > >> > > >> > > > > > >>> for
> > > >> > > >> > > > > > >>>>>>   example, if we want to calculate the
> > > >> multiple-level
> > > >> > > >> > > > aggregation,
> > > >> > > >> > > > > > >>> we
> > > >> > > >> > > > > > >>>>> can
> > > >> > > >> > > > > > >>>>>> do
> > > >> > > >> > > > > > >>>>>>   multiple-level aggregation in the local
> > > >> > aggregation:
> > > >> > > >> > > > > > >>>>>>
> > > >> > >  input.localKeyBy("a").sum(1).localKeyBy("b").window();
> > > >> > > >> //
> > > >> > > >> > > here
> > > >> > > >> > > > > > >> "a"
> > > >> > > >> > > > > > >>>> is
> > > >> > > >> > > > > > >>>>> a
> > > >> > > >> > > > > > >>>>>>   sub-category, while "b" is a category,
> here
> > we
> > > >> do
> > > >> > not
> > > >> > > >> need
> > > >> > > >> > > to
> > > >> > > >> > > > > > >>>> shuffle
> > > >> > > >> > > > > > >>>>>> data
> > > >> > > >> > > > > > >>>>>>   in the network.
> > > >> > > >> > > > > > >>>>>>   - The users of DataStream API will benefit
> > > from
> > > >> > this.
> > > >> > > >> > > > Actually,
> > > >> > > >> > > > > > >> we
> > > >> > > >> > > > > > >>>>> have
> > > >> > > >> > > > > > >>>>>>   a lot of scenes need to use DataStream
> API.
> > > >> > > Currently,
> > > >> > > >> > > > > > >> DataStream
> > > >> > > >> > > > > > >>>> API
> > > >> > > >> > > > > > >>>>> is
> > > >> > > >> > > > > > >>>>>>   the cornerstone of the physical plan of
> > Flink
> > > >> SQL.
> > > >> > > >> With a
> > > >> > > >> > > > > > >>> localKeyBy
> > > >> > > >> > > > > > >>>>>> API,
> > > >> > > >> > > > > > >>>>>>   the optimization of SQL at least may use
> > this
> > > >> > > optimized
> > > >> > > >> > API,
> > > >> > > >> > > > > > >> this
> > > >> > > >> > > > > > >>>> is a
> > > >> > > >> > > > > > >>>>>>   further topic.
> > > >> > > >> > > > > > >>>>>>   - Based on the window operator, our state
> > > would
> > > >> > > benefit
> > > >> > > >> > from
> > > >> > > >> > > > > > >> Flink
> > > >> > > >> > > > > > >>>>> State
> > > >> > > >> > > > > > >>>>>>   and checkpoint, we do not need to worry
> > about
> > > >> OOM
> > > >> > and
> > > >> > > >> job
> > > >> > > >> > > > > > >> failed.
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Now, about your questions:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> 1. About our design cannot change the data
> > type
> > > >> and
> > > >> > > about
> > > >> > > >> > the
> > > >> > > >> > > > > > >>>>>> implementation of average:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy
> is
> > > an
> > > >> API
> > > >> > > >> > provides
> > > >> > > >> > > > to
> > > >> > > >> > > > > > >> the
> > > >> > > >> > > > > > >>>>> users
> > > >> > > >> > > > > > >>>>>> who use DataStream API to build their jobs.
> > > >> > > >> > > > > > >>>>>> Users should know its semantics and the
> > > difference
> > > >> > with
> > > >> > > >> > keyBy
> > > >> > > >> > > > API,
> > > >> > > >> > > > > > >> so
> > > >> > > >> > > > > > >>>> if
> > > >> > > >> > > > > > >>>>>> they want to the average aggregation, they
> > > should
> > > >> > carry
> > > >> > > >> > local
> > > >> > > >> > > > sum
> > > >> > > >> > > > > > >>>> result
> > > >> > > >> > > > > > >>>>>> and local count result.
> > > >> > > >> > > > > > >>>>>> I admit that it will be convenient to use
> > keyBy
> > > >> > > directly.
> > > >> > > >> > But
> > > >> > > >> > > we
> > > >> > > >> > > > > > >> need
> > > >> > > >> > > > > > >>>> to
> > > >> > > >> > > > > > >>>>>> pay a little price when we get some
> benefits.
> > I
> > > >> think
> > > >> > > >> this
> > > >> > > >> > > price
> > > >> > > >> > > > > is
> > > >> > > >> > > > > > >>>>>> reasonable. Considering that the DataStream
> > API
> > > >> > itself
> > > >> > > >> is a
> > > >> > > >> > > > > > >> low-level
> > > >> > > >> > > > > > >>>> API
> > > >> > > >> > > > > > >>>>>> (at least for now).
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> 2. About stateless operator and
> > > >> > > >> > > > > > >>>>>>
> `StreamOperator::prepareSnapshotPreBarrier()`:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Actually, I have discussed this opinion with
> > > >> @dianfu
> > > >> > in
> > > >> > > >> the
> > > >> > > >> > > old
> > > >> > > >> > > > > > >> mail
> > > >> > > >> > > > > > >>>>>> thread. I will copy my opinion from there:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>>   - for your design, you still need
> somewhere
> > to
> > > >> give
> > > >> > > the
> > > >> > > >> > > users
> > > >> > > >> > > > > > >>>>> configure
> > > >> > > >> > > > > > >>>>>>   the trigger threshold (maybe memory
> > > >> availability?),
> > > >> > > >> this
> > > >> > > >> > > > design
> > > >> > > >> > > > > > >>>> cannot
> > > >> > > >> > > > > > >>>>>>   guarantee a deterministic semantics (it
> will
> > > >> bring
> > > >> > > >> trouble
> > > >> > > >> > > for
> > > >> > > >> > > > > > >>>> testing
> > > >> > > >> > > > > > >>>>>> and
> > > >> > > >> > > > > > >>>>>>   debugging).
> > > >> > > >> > > > > > >>>>>>   - if the implementation depends on the
> > timing
> > > of
> > > >> > > >> > checkpoint,
> > > >> > > >> > > > it
> > > >> > > >> > > > > > >>>> would
> > > >> > > >> > > > > > >>>>>>   affect the checkpoint's progress, and the
> > > >> buffered
> > > >> > > data
> > > >> > > >> > may
> > > >> > > >> > > > > > >> cause
> > > >> > > >> > > > > > >>>> OOM
> > > >> > > >> > > > > > >>>>>>   issue. In addition, if the operator is
> > > >> stateless,
> > > >> > it
> > > >> > > >> can
> > > >> > > >> > not
> > > >> > > >> > > > > > >>> provide
> > > >> > > >> > > > > > >>>>>> fault
> > > >> > > >> > > > > > >>>>>>   tolerance.
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Best,
> > > >> > > >> > > > > > >>>>>> Vino
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二
> > > >> > 上午9:22写道:
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>>> Hi Vino,
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the general
> > > idea
> > > >> and
> > > >> > > IMO
> > > >> > > >> > it's
> > > >> > > >> > > > > > >> very
> > > >> > > >> > > > > > >>>>> useful
> > > >> > > >> > > > > > >>>>>>> feature.
> > > >> > > >> > > > > > >>>>>>> But after reading through the document, I
> > feel
> > > >> that
> > > >> > we
> > > >> > > >> may
> > > >> > > >> > > over
> > > >> > > >> > > > > > >>>> design
> > > >> > > >> > > > > > >>>>>> the
> > > >> > > >> > > > > > >>>>>>> required
> > > >> > > >> > > > > > >>>>>>> operator for proper local aggregation. The
> > main
> > > >> > reason
> > > >> > > >> is
> > > >> > > >> > we
> > > >> > > >> > > > want
> > > >> > > >> > > > > > >>> to
> > > >> > > >> > > > > > >>>>>> have a
> > > >> > > >> > > > > > >>>>>>> clear definition and behavior about the
> > "local
> > > >> keyed
> > > >> > > >> state"
> > > >> > > >> > > > which
> > > >> > > >> > > > > > >>> in
> > > >> > > >> > > > > > >>>> my
> > > >> > > >> > > > > > >>>>>>> opinion is not
> > > >> > > >> > > > > > >>>>>>> necessary for local aggregation, at least
> for
> > > >> start.
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> Another issue I noticed is the local key by
> > > >> operator
> > > >> > > >> cannot
> > > >> > > >> > > > > > >> change
> > > >> > > >> > > > > > >>>>>> element
> > > >> > > >> > > > > > >>>>>>> type, it will
> > > >> > > >> > > > > > >>>>>>> also restrict a lot of use cases which can
> be
> > > >> > benefit
> > > >> > > >> from
> > > >> > > >> > > > local
> > > >> > > >> > > > > > >>>>>>> aggregation, like "average".
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and the
> only
> > > >> thing
> > > >> > > >> need to
> > > >> > > >> > > be
> > > >> > > >> > > > > > >> done
> > > >> > > >> > > > > > >>>> is
> > > >> > > >> > > > > > >>>>>>> introduce
> > > >> > > >> > > > > > >>>>>>> a stateless lightweight operator which is
> > > >> *chained*
> > > >> > > >> before
> > > >> > > >> > > > > > >>> `keyby()`.
> > > >> > > >> > > > > > >>>>> The
> > > >> > > >> > > > > > >>>>>>> operator will flush all buffered
> > > >> > > >> > > > > > >>>>>>> elements during
> > > >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()`
> > > >> > > >> > > > and
> > > >> > > >> > > > > > >>>> make
> > > >> > > >> > > > > > >>>>>>> himself stateless.
> > > >> > > >> > > > > > >>>>>>> By the way, in the earlier version we also
> > did
> > > >> the
> > > >> > > >> similar
> > > >> > > >> > > > > > >> approach
> > > >> > > >> > > > > > >>>> by
> > > >> > > >> > > > > > >>>>>>> introducing a stateful
> > > >> > > >> > > > > > >>>>>>> local aggregation operator but it's not
> > > >> performed as
> > > >> > > >> well
> > > >> > > >> > as
> > > >> > > >> > > > the
> > > >> > > >> > > > > > >>>> later
> > > >> > > >> > > > > > >>>>>> one,
> > > >> > > >> > > > > > >>>>>>> and also effect the barrie
> > > >> > > >> > > > > > >>>>>>> alignment time. The later one is fairly
> > simple
> > > >> and
> > > >> > > more
> > > >> > > >> > > > > > >> efficient.
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> I would highly suggest you to consider to
> > have
> > > a
> > > >> > > >> stateless
> > > >> > > >> > > > > > >> approach
> > > >> > > >> > > > > > >>>> at
> > > >> > > >> > > > > > >>>>>> the
> > > >> > > >> > > > > > >>>>>>> first step.
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>> Kurt
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <
> > > >> > > >> imj...@gmail.com>
> > > >> > > >> > > > > > >> wrote:
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>>> Hi Vino,
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>> Thanks for the proposal.
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)"
> vs
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> >
> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
> > > >> > > >> > > > > > >> have
> > > >> > > >> > > > > > >>>> you
> > > >> > > >> > > > > > >>>>>>> done
> > > >> > > >> > > > > > >>>>>>>> some benchmark?
> > > >> > > >> > > > > > >>>>>>>> Because I'm curious about how much
> > performance
> > > >> > > >> improvement
> > > >> > > >> > > can
> > > >> > > >> > > > > > >> we
> > > >> > > >> > > > > > >>>> get
> > > >> > > >> > > > > > >>>>>> by
> > > >> > > >> > > > > > >>>>>>>> using count window as the local operator.
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>>> Jark
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <
> > > >> > > >> > > > yanghua1...@gmail.com
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>>>> wrote:
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> Hi Hequn,
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> Thanks for your reply.
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to
> > provide a
> > > >> tool
> > > >> > > >> which
> > > >> > > >> > > can
> > > >> > > >> > > > > > >>> let
> > > >> > > >> > > > > > >>>>>> users
> > > >> > > >> > > > > > >>>>>>> do
> > > >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The
> behavior
> > of
> > > >> the
> > > >> > > >> > > > > > >>> pre-aggregation
> > > >> > > >> > > > > > >>>>> is
> > > >> > > >> > > > > > >>>>>>>>> similar to keyBy API.
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> So the three cases are different, I will
> > > >> describe
> > > >> > > them
> > > >> > > >> > one
> > > >> > > >> > > by
> > > >> > > >> > > > > > >>>> one:
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> *In this case, the result is
> event-driven,
> > > each
> > > >> > > event
> > > >> > > >> can
> > > >> > > >> > > > > > >>> produce
> > > >> > > >> > > > > > >>>>> one
> > > >> > > >> > > > > > >>>>>>> sum
> > > >> > > >> > > > > > >>>>>>>>> aggregation result and it is the latest
> one
> > > >> from
> > > >> > the
> > > >> > > >> > source
> > > >> > > >> > > > > > >>>> start.*
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> 2.
> > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may have a
> > > >> problem, it
> > > >> > > >> would
> > > >> > > >> > do
> > > >> > > >> > > > > > >> the
> > > >> > > >> > > > > > >>>>> local
> > > >> > > >> > > > > > >>>>>>> sum
> > > >> > > >> > > > > > >>>>>>>>> aggregation and will produce the latest
> > > partial
> > > >> > > result
> > > >> > > >> > from
> > > >> > > >> > > > > > >> the
> > > >> > > >> > > > > > >>>>>> source
> > > >> > > >> > > > > > >>>>>>>>> start for every event. *
> > > >> > > >> > > > > > >>>>>>>>> *These latest partial results from the
> same
> > > key
> > > >> > are
> > > >> > > >> > hashed
> > > >> > > >> > > to
> > > >> > > >> > > > > > >>> one
> > > >> > > >> > > > > > >>>>>> node
> > > >> > > >> > > > > > >>>>>>> to
> > > >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.*
> > > >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it
> > received
> > > >> > > multiple
> > > >> > > >> > > partial
> > > >> > > >> > > > > > >>>>> results
> > > >> > > >> > > > > > >>>>>>>> (they
> > > >> > > >> > > > > > >>>>>>>>> are all calculated from the source start)
> > and
> > > >> sum
> > > >> > > them
> > > >> > > >> > will
> > > >> > > >> > > > > > >> get
> > > >> > > >> > > > > > >>>> the
> > > >> > > >> > > > > > >>>>>>> wrong
> > > >> > > >> > > > > > >>>>>>>>> result.*
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> 3.
> > > >> > > >> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a
> partial
> > > >> > > aggregation
> > > >> > > >> > > result
> > > >> > > >> > > > > > >>> for
> > > >> > > >> > > > > > >>>>>> the 5
> > > >> > > >> > > > > > >>>>>>>>> records in the count window. The partial
> > > >> > aggregation
> > > >> > > >> > > results
> > > >> > > >> > > > > > >>> from
> > > >> > > >> > > > > > >>>>> the
> > > >> > > >> > > > > > >>>>>>>> same
> > > >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.*
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> So the first case and the third case can
> > get
> > > >> the
> > > >> > > >> *same*
> > > >> > > >> > > > > > >> result,
> > > >> > > >> > > > > > >>>> the
> > > >> > > >> > > > > > >>>>>>>>> difference is the output-style and the
> > > latency.
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key API is
> > just
> > > >> an
> > > >> > > >> > > optimization
> > > >> > > >> > > > > > >>>> API.
> > > >> > > >> > > > > > >>>>> We
> > > >> > > >> > > > > > >>>>>>> do
> > > >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the user
> > has
> > > to
> > > >> > > >> > understand
> > > >> > > >> > > > > > >> its
> > > >> > > >> > > > > > >>>>>>> semantics
> > > >> > > >> > > > > > >>>>>>>>> and use it correctly.
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>>>> Vino
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com>
> > > >> 于2019年6月17日周一
> > > >> > > >> > 下午4:18写道:
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> Hi Vino,
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a
> > > very
> > > >> > good
> > > >> > > >> > > feature!
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is the
> > > semantics
> > > >> > for
> > > >> > > >> the
> > > >> > > >> > > > > > >>>>>> `localKeyBy`.
> > > >> > > >> > > > > > >>>>>>>> From
> > > >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API
> returns
> > > an
> > > >> > > >> instance
> > > >> > > >> > of
> > > >> > > >> > > > > > >>>>>>> `KeyedStream`
> > > >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so in this
> > > case,
> > > >> > > what's
> > > >> > > >> > the
> > > >> > > >> > > > > > >>>>> semantics
> > > >> > > >> > > > > > >>>>>>> for
> > > >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the
> > > >> following
> > > >> > > code
> > > >> > > >> > share
> > > >> > > >> > > > > > >>> the
> > > >> > > >> > > > > > >>>>> same
> > > >> > > >> > > > > > >>>>>>>>> result?
> > > >> > > >> > > > > > >>>>>>>>>> and what're the differences between
> them?
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>> 2.
> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>> 3.
> > > >> > > >> > > > > > >>
> > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add this
> > into
> > > >> the
> > > >> > > >> > document.
> > > >> > > >> > > > > > >>> Thank
> > > >> > > >> > > > > > >>>>> you
> > > >> > > >> > > > > > >>>>>>>> very
> > > >> > > >> > > > > > >>>>>>>>>> much.
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> Best, Hequn
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino
> > yang <
> > > >> > > >> > > > > > >>>>> yanghua1...@gmail.com>
> > > >> > > >> > > > > > >>>>>>>>> wrote:
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha,
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*"
> section
> > of
> > > >> FLIP
> > > >> > > >> wiki
> > > >> > > >> > > > > > >>>> page.[1]
> > > >> > > >> > > > > > >>>>>> This
> > > >> > > >> > > > > > >>>>>>>>> mail
> > > >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has proceeded
> to
> > > the
> > > >> > > third
> > > >> > > >> > step.
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote
> > > step),
> > > >> I
> > > >> > > >> didn't
> > > >> > > >> > > > > > >> find
> > > >> > > >> > > > > > >>>> the
> > > >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the voting
> > > >> process.
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion of this
> > > >> feature
> > > >> > > has
> > > >> > > >> > been
> > > >> > > >> > > > > > >>> done
> > > >> > > >> > > > > > >>>>> in
> > > >> > > >> > > > > > >>>>>>> the
> > > >> > > >> > > > > > >>>>>>>>> old
> > > >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when
> > should
> > > I
> > > >> > start
> > > >> > > >> > > > > > >> voting?
> > > >> > > >> > > > > > >>>> Can
> > > >> > > >> > > > > > >>>>> I
> > > >> > > >> > > > > > >>>>>>>> start
> > > >> > > >> > > > > > >>>>>>>>>> now?
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>>>>>> Vino
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> [1]:
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > >> > > >> > > > > > >>>>>>>>>>> [2]:
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com>
> > 于2019年6月13日周四
> > > >> > > 上午9:19写道:
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your
> > > >> efforts.
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>>>>>>> Leesf
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com>
> > > >> > 于2019年6月12日周三
> > > >> > > >> > > > > > >>> 下午5:46写道:
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Hi folks,
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP
> > discussion
> > > >> > thread
> > > >> > > >> > > > > > >> about
> > > >> > > >> > > > > > >>>>>>> supporting
> > > >> > > >> > > > > > >>>>>>>>>> local
> > > >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink.
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can
> effectively
> > > >> > alleviate
> > > >> > > >> data
> > > >> > > >> > > > > > >>>> skew.
> > > >> > > >> > > > > > >>>>>>> This
> > > >> > > >> > > > > > >>>>>>>> is
> > > >> > > >> > > > > > >>>>>>>>>> the
> > > >> > > >> > > > > > >>>>>>>>>>>>> FLIP:
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely
> > used
> > > to
> > > >> > > >> perform
> > > >> > > >> > > > > > >>>>>> aggregating
> > > >> > > >> > > > > > >>>>>>>>>>>> operations
> > > >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the
> > > >> elements
> > > >> > > >> that
> > > >> > > >> > > > > > >>> have
> > > >> > > >> > > > > > >>>>> the
> > > >> > > >> > > > > > >>>>>>> same
> > > >> > > >> > > > > > >>>>>>>>>> key.
> > > >> > > >> > > > > > >>>>>>>>>>>> When
> > > >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the elements
> with
> > > the
> > > >> > same
> > > >> > > >> key
> > > >> > > >> > > > > > >>> will
> > > >> > > >> > > > > > >>>> be
> > > >> > > >> > > > > > >>>>>>> sent
> > > >> > > >> > > > > > >>>>>>>> to
> > > >> > > >> > > > > > >>>>>>>>>> and
> > > >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task.
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> The performance of these aggregating
> > > >> > operations
> > > >> > > is
> > > >> > > >> > > > > > >> very
> > > >> > > >> > > > > > >>>>>>> sensitive
> > > >> > > >> > > > > > >>>>>>>>> to
> > > >> > > >> > > > > > >>>>>>>>>>> the
> > > >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases
> > where
> > > >> the
> > > >> > > >> > > > > > >>> distribution
> > > >> > > >> > > > > > >>>>> of
> > > >> > > >> > > > > > >>>>>>> keys
> > > >> > > >> > > > > > >>>>>>>>>>>> follows a
> > > >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance will be
> > > >> > > >> significantly
> > > >> > > >> > > > > > >>>>>> downgraded.
> > > >> > > >> > > > > > >>>>>>>>> More
> > > >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of
> > > >> > parallelism
> > > >> > > >> does
> > > >> > > >> > > > > > >>> not
> > > >> > > >> > > > > > >>>>> help
> > > >> > > >> > > > > > >>>>>>>> when
> > > >> > > >> > > > > > >>>>>>>>> a
> > > >> > > >> > > > > > >>>>>>>>>>> task
> > > >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted
> > > >> method
> > > >> > to
> > > >> > > >> > > > > > >> reduce
> > > >> > > >> > > > > > >>>> the
> > > >> > > >> > > > > > >>>>>>>>>> performance
> > > >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can
> decompose
> > > the
> > > >> > > >> > > > > > >> aggregating
> > > >> > > >> > > > > > >>>>>>>> operations
> > > >> > > >> > > > > > >>>>>>>>>> into
> > > >> > > >> > > > > > >>>>>>>>>>>> two
> > > >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we
> > aggregate
> > > >> the
> > > >> > > >> elements
> > > >> > > >> > > > > > >>> of
> > > >> > > >> > > > > > >>>>> the
> > > >> > > >> > > > > > >>>>>>> same
> > > >> > > >> > > > > > >>>>>>>>> key
> > > >> > > >> > > > > > >>>>>>>>>>> at
> > > >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial
> > > results.
> > > >> > Then
> > > >> > > at
> > > >> > > >> > > > > > >> the
> > > >> > > >> > > > > > >>>>> second
> > > >> > > >> > > > > > >>>>>>>>> phase,
> > > >> > > >> > > > > > >>>>>>>>>>>> these
> > > >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to receivers
> > > >> > according
> > > >> > > to
> > > >> > > >> > > > > > >>> their
> > > >> > > >> > > > > > >>>>> keys
> > > >> > > >> > > > > > >>>>>>> and
> > > >> > > >> > > > > > >>>>>>>>> are
> > > >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final result.
> > > Since
> > > >> the
> > > >> > > >> number
> > > >> > > >> > > > > > >>> of
> > > >> > > >> > > > > > >>>>>>> partial
> > > >> > > >> > > > > > >>>>>>>>>>> results
> > > >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is limited
> by
> > > the
> > > >> > > >> number of
> > > >> > > >> > > > > > >>>>>> senders,
> > > >> > > >> > > > > > >>>>>>>> the
> > > >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be
> > reduced.
> > > >> > > >> Besides, by
> > > >> > > >> > > > > > >>>>>> reducing
> > > >> > > >> > > > > > >>>>>>>> the
> > > >> > > >> > > > > > >>>>>>>>>>> amount
> > > >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the performance
> can
> > > be
> > > >> > > further
> > > >> > > >> > > > > > >>>>> improved.
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> *More details*:
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Design documentation:
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread:
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> https://issues.apache.org/jira/browse/FLINK-12786
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your
> > feedback!
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>> Best,
> > > >> > > >> > > > > > >>>>>>>>>>>>> Vino
> > > >> > > >> > > > > > >>>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>>
> > > >> > > >> > > > > > >>>>>>>>
> > > >> > > >> > > > > > >>>>>>>
> > > >> > > >> > > > > > >>>>>>
> > > >> > > >> > > > > > >>>>>
> > > >> > > >> > > > > > >>>>
> > > >> > > >> > > > > > >>>
> > > >> > > >> > > > > > >>
> > > >> > > >> > > > > >
> > > >> > > >> > > > > >
> > > >> > > >> > > > >
> > > >> > > >> > > >
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to