Hi vino,

One thing to add,  for a), I think use one or two examples like how to do
local aggregation on a sliding window,
and how do we do local aggregation on an unbounded aggregate, will do a lot
help.

Best,
Kurt


On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com> wrote:

> Hi vino,
>
> I think there are several things still need discussion.
>
> a) We all agree that we should first go with a unified abstraction, but
> the abstraction is not reflected by the FLIP.
> If your answer is "locakKeyBy" API, then I would ask how do we combine
> with `AggregateFunction`, and how do
> we do proper local aggregation for those have different intermediate
> result type, like AVG. Could you add these
> to the document?
>
> b) From implementation side, reusing window operator is one of the
> possible solutions, but not we base on window
> operator to have two different implementations. What I understanding is,
> one of the possible implementations should
> not touch window operator.
>
> c) 80% of your FLIP content is actually describing how do we support local
> keyed state. I don't know if this is necessary
> to introduce at the first step and we should also involve committers work
> on state backend to share their thoughts.
>
> Best,
> Kurt
>
>
> On Mon, Jun 24, 2019 at 5:17 PM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Kurt,
>>
>> You did not give more further different opinions, so I thought you have
>> agreed with the design after we promised to support two kinds of
>> implementation.
>>
>> In API level, we have answered your question about pass an
>> AggregateFunction to do the aggregation. No matter introduce localKeyBy
>> API
>> or not, we can support AggregateFunction.
>>
>> So what's your different opinion now? Can you share it with us?
>>
>> Best,
>> Vino
>>
>> Kurt Young <ykt...@gmail.com> 于2019年6月24日周一 下午4:24写道:
>>
>> > Hi vino,
>> >
>> > Sorry I don't see the consensus about reusing window operator and keep
>> the
>> > API design of localKeyBy. But I think we should definitely more thoughts
>> > about this topic.
>> >
>> > I also try to loop in Stephan for this discussion.
>> >
>> > Best,
>> > Kurt
>> >
>> >
>> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <yanghua1...@gmail.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > > I am happy we have a wonderful discussion and received many valuable
>> > > opinions in the last few days.
>> > >
>> > > Now, let me try to summarize what we have reached consensus about the
>> > > changes in the design.
>> > >
>> > >    - provide a unified abstraction to support two kinds of
>> > implementation;
>> > >    - reuse WindowOperator and try to enhance it so that we can make
>> the
>> > >    intermediate result of the local aggregation can be buffered and
>> > > flushed to
>> > >    support two kinds of implementation;
>> > >    - keep the API design of localKeyBy, but declare the disabled some
>> > APIs
>> > >    we cannot support currently, and provide a configurable API for
>> users
>> > to
>> > >    choose how to handle intermediate result;
>> > >
>> > > The above three points have been updated in the design doc. Any
>> > > questions, please let me know.
>> > >
>> > > @Aljoscha Krettek <aljos...@apache.org> What do you think? Any
>> further
>> > > comments?
>> > >
>> > > Best,
>> > > Vino
>> > >
>> > > vino yang <yanghua1...@gmail.com> 于2019年6月20日周四 下午2:02写道:
>> > >
>> > > > Hi Kurt,
>> > > >
>> > > > Thanks for your comments.
>> > > >
>> > > > It seems we come to a consensus that we should alleviate the
>> > performance
>> > > > degraded by data skew with local aggregation. In this FLIP, our key
>> > > > solution is to introduce local keyed partition to achieve this goal.
>> > > >
>> > > > I also agree that we can benefit a lot from the usage of
>> > > > AggregateFunction. In combination with localKeyBy, We can easily
>> use it
>> > > to
>> > > > achieve local aggregation:
>> > > >
>> > > >    - input.localKeyBy(0).aggregate()
>> > > >    - input.localKeyBy(0).window().aggregate()
>> > > >
>> > > >
>> > > > I think the only problem here is the choices between
>> > > >
>> > > >    - (1) Introducing a new primitive called localKeyBy and implement
>> > > >    local aggregation with existing operators, or
>> > > >    - (2) Introducing an operator called localAggregation which is
>> > > >    composed of a key selector, a window-like operator, and an
>> aggregate
>> > > >    function.
>> > > >
>> > > >
>> > > > There may exist some optimization opportunities by providing a
>> > composited
>> > > > interface for local aggregation. But at the same time, in my
>> opinion,
>> > we
>> > > > lose flexibility (Or we need certain efforts to achieve the same
>> > > > flexibility).
>> > > >
>> > > > As said in the previous mails, we have many use cases where the
>> > > > aggregation is very complicated and cannot be performed with
>> > > > AggregateFunction. For example, users may perform windowed
>> aggregations
>> > > > according to time, data values, or even external storage. Typically,
>> > they
>> > > > now use KeyedProcessFunction or customized triggers to implement
>> these
>> > > > aggregations. It's not easy to address data skew in such cases with
>> a
>> > > > composited interface for local aggregation.
>> > > >
>> > > > Given that Data Stream API is exactly targeted at these cases where
>> the
>> > > > application logic is very complicated and optimization does not
>> > matter, I
>> > > > think it's a better choice to provide a relatively low-level and
>> > > canonical
>> > > > interface.
>> > > >
>> > > > The composited interface, on the other side, may be a good choice in
>> > > > declarative interfaces, including SQL and Table API, as it allows
>> more
>> > > > optimization opportunities.
>> > > >
>> > > > Best,
>> > > > Vino
>> > > >
>> > > >
>> > > > Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道:
>> > > >
>> > > >> Hi all,
>> > > >>
>> > > >> As vino said in previous emails, I think we should first discuss
>> and
>> > > >> decide
>> > > >> what kind of use cases this FLIP want to
>> > > >> resolve, and what the API should look like. From my side, I think
>> this
>> > > is
>> > > >> probably the root cause of current divergence.
>> > > >>
>> > > >> My understand is (from the FLIP title and motivation section of the
>> > > >> document), we want to have a proper support of
>> > > >> local aggregation, or pre aggregation. This is not a very new idea,
>> > most
>> > > >> SQL engine already did this improvement. And
>> > > >> the core concept about this is, there should be an
>> AggregateFunction,
>> > no
>> > > >> matter it's a Flink runtime's AggregateFunction or
>> > > >> SQL's UserDefinedAggregateFunction. Both aggregation have concept
>> of
>> > > >> intermediate data type, sometimes we call it ACC.
>> > > >> I quickly went through the POC piotr did before [1], it also
>> directly
>> > > uses
>> > > >> AggregateFunction.
>> > > >>
>> > > >> But the thing is, after reading the design of this FLIP, I can't
>> help
>> > > >> myself feeling that this FLIP is not targeting to have a proper
>> > > >> local aggregation support. It actually want to introduce another
>> > > concept:
>> > > >> LocalKeyBy, and how to split and merge local key groups,
>> > > >> and how to properly support state on local key. Local aggregation
>> just
>> > > >> happened to be one possible use case of LocalKeyBy.
>> > > >> But it lacks supporting the essential concept of local aggregation,
>> > > which
>> > > >> is intermediate data type. Without this, I really don't thing
>> > > >> it is a good fit of local aggregation.
>> > > >>
>> > > >> Here I want to make sure of the scope or the goal about this FLIP,
>> do
>> > we
>> > > >> want to have a proper local aggregation engine, or we
>> > > >> just want to introduce a new concept called LocalKeyBy?
>> > > >>
>> > > >> [1]: https://github.com/apache/flink/pull/4626
>> > > >>
>> > > >> Best,
>> > > >> Kurt
>> > > >>
>> > > >>
>> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <yanghua1...@gmail.com>
>> > > wrote:
>> > > >>
>> > > >> > Hi Hequn,
>> > > >> >
>> > > >> > Thanks for your comments!
>> > > >> >
>> > > >> > I agree that allowing local aggregation reusing window API and
>> > > refining
>> > > >> > window operator to make it match both requirements (come from our
>> > and
>> > > >> Kurt)
>> > > >> > is a good decision!
>> > > >> >
>> > > >> > Concerning your questions:
>> > > >> >
>> > > >> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may
>> be
>> > > >> > meaningless.
>> > > >> >
>> > > >> > Yes, it does not make sense in most cases. However, I also want
>> to
>> > > note
>> > > >> > users should know the right semantics of localKeyBy and use it
>> > > >> correctly.
>> > > >> > Because this issue also exists for the global keyBy, consider
>> this
>> > > >> example:
>> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also
>> > meaningless.
>> > > >> >
>> > > >> > 2. About the semantics of
>> > > >> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
>> > > >> >
>> > > >> > Good catch! I agree with you that it's not good to enable all
>> > > >> > functionalities for localKeyBy from KeyedStream.
>> > > >> > Currently, We do not support some APIs such as
>> > > >> > connect/join/intervalJoin/coGroup. This is due to that we force
>> the
>> > > >> > operators on LocalKeyedStreams chained with the inputs.
>> > > >> >
>> > > >> > Best,
>> > > >> > Vino
>> > > >> >
>> > > >> >
>> > > >> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道:
>> > > >> >
>> > > >> > > Hi,
>> > > >> > >
>> > > >> > > Thanks a lot for your great discussion and great to see that
>> some
>> > > >> > agreement
>> > > >> > > has been reached on the "local aggregate engine"!
>> > > >> > >
>> > > >> > > ===> Considering the abstract engine,
>> > > >> > > I'm thinking is it valuable for us to extend the current
>> window to
>> > > >> meet
>> > > >> > > both demands raised by Kurt and Vino? There are some benefits
>> we
>> > can
>> > > >> get:
>> > > >> > >
>> > > >> > > 1. The interfaces of the window are complete and clear. With
>> > > windows,
>> > > >> we
>> > > >> > > can define a lot of ways to split the data and perform
>> different
>> > > >> > > computations.
>> > > >> > > 2. We can also leverage the window to do miniBatch for the
>> global
>> > > >> > > aggregation, i.e, we can use the window to bundle data belong
>> to
>> > the
>> > > >> same
>> > > >> > > key, for every bundle we only need to read and write once
>> state.
>> > > This
>> > > >> can
>> > > >> > > greatly reduce state IO and improve performance.
>> > > >> > > 3. A lot of other use cases can also benefit from the window
>> base
>> > on
>> > > >> > memory
>> > > >> > > or stateless.
>> > > >> > >
>> > > >> > > ===> As for the API,
>> > > >> > > I think it is good to make our API more flexible. However, we
>> may
>> > > >> need to
>> > > >> > > make our API meaningful.
>> > > >> > >
>> > > >> > > Take my previous reply as an example,
>> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result may be
>> > > >> > meaningless.
>> > > >> > > Another example I find is the intervalJoin, e.g.,
>> > > >> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In
>> this
>> > > >> case, it
>> > > >> > > will bring problems if input1 and input2 share different
>> > > parallelism.
>> > > >> We
>> > > >> > > don't know which input should the join chained with? Even if
>> they
>> > > >> share
>> > > >> > the
>> > > >> > > same parallelism, it's hard to tell what the join is doing.
>> There
>> > > are
>> > > >> > maybe
>> > > >> > > some other problems.
>> > > >> > >
>> > > >> > > From this point of view, it's at least not good to enable all
>> > > >> > > functionalities for localKeyBy from KeyedStream?
>> > > >> > >
>> > > >> > > Great to also have your opinions.
>> > > >> > >
>> > > >> > > Best, Hequn
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <
>> yanghua1...@gmail.com
>> > >
>> > > >> > wrote:
>> > > >> > >
>> > > >> > > > Hi Kurt and Piotrek,
>> > > >> > > >
>> > > >> > > > Thanks for your comments.
>> > > >> > > >
>> > > >> > > > I agree that we can provide a better abstraction to be
>> > compatible
>> > > >> with
>> > > >> > > two
>> > > >> > > > different implementations.
>> > > >> > > >
>> > > >> > > > First of all, I think we should consider what kind of
>> scenarios
>> > we
>> > > >> need
>> > > >> > > to
>> > > >> > > > support in *API* level?
>> > > >> > > >
>> > > >> > > > We have some use cases which need to a customized aggregation
>> > > >> through
>> > > >> > > > KeyedProcessFunction, (in the usage of our localKeyBy.window
>> > they
>> > > >> can
>> > > >> > use
>> > > >> > > > ProcessWindowFunction).
>> > > >> > > >
>> > > >> > > > Shall we support these flexible use scenarios?
>> > > >> > > >
>> > > >> > > > Best,
>> > > >> > > > Vino
>> > > >> > > >
>> > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道:
>> > > >> > > >
>> > > >> > > > > Hi Piotr,
>> > > >> > > > >
>> > > >> > > > > Thanks for joining the discussion. Make “local aggregation"
>> > > >> abstract
>> > > >> > > > enough
>> > > >> > > > > sounds good to me, we could
>> > > >> > > > > implement and verify alternative solutions for use cases of
>> > > local
>> > > >> > > > > aggregation. Maybe we will find both solutions
>> > > >> > > > > are appropriate for different scenarios.
>> > > >> > > > >
>> > > >> > > > > Starting from a simple one sounds a practical way to go.
>> What
>> > do
>> > > >> you
>> > > >> > > > think,
>> > > >> > > > > vino?
>> > > >> > > > >
>> > > >> > > > > Best,
>> > > >> > > > > Kurt
>> > > >> > > > >
>> > > >> > > > >
>> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
>> > > >> pi...@ververica.com>
>> > > >> > > > > wrote:
>> > > >> > > > >
>> > > >> > > > > > Hi Kurt and Vino,
>> > > >> > > > > >
>> > > >> > > > > > I think there is a trade of hat we need to consider for
>> the
>> > > >> local
>> > > >> > > > > > aggregation.
>> > > >> > > > > >
>> > > >> > > > > > Generally speaking I would agree with Kurt about local
>> > > >> > > aggregation/pre
>> > > >> > > > > > aggregation not using Flink's state flush the operator
>> on a
>> > > >> > > checkpoint.
>> > > >> > > > > > Network IO is usually cheaper compared to Disks IO. This
>> has
>> > > >> > however
>> > > >> > > > > couple
>> > > >> > > > > > of issues:
>> > > >> > > > > > 1. It can explode number of in-flight records during
>> > > checkpoint
>> > > >> > > barrier
>> > > >> > > > > > alignment, making checkpointing slower and decrease the
>> > actual
>> > > >> > > > > throughput.
>> > > >> > > > > > 2. This trades Disks IO on the local aggregation machine
>> > with
>> > > >> CPU
>> > > >> > > (and
>> > > >> > > > > > Disks IO in case of RocksDB) on the final aggregation
>> > machine.
>> > > >> This
>> > > >> > > is
>> > > >> > > > > > fine, as long there is no huge data skew. If there is
>> only a
>> > > >> > handful
>> > > >> > > > (or
>> > > >> > > > > > even one single) hot keys, it might be better to keep the
>> > > >> > persistent
>> > > >> > > > > state
>> > > >> > > > > > in the LocalAggregationOperator to offload final
>> aggregation
>> > > as
>> > > >> > much
>> > > >> > > as
>> > > >> > > > > > possible.
>> > > >> > > > > > 3. With frequent checkpointing local aggregation
>> > effectiveness
>> > > >> > would
>> > > >> > > > > > degrade.
>> > > >> > > > > >
>> > > >> > > > > > I assume Kurt is correct, that in your use cases
>> stateless
>> > > >> operator
>> > > >> > > was
>> > > >> > > > > > behaving better, but I could easily see other use cases
>> as
>> > > well.
>> > > >> > For
>> > > >> > > > > > example someone is already using RocksDB, and his job is
>> > > >> > bottlenecked
>> > > >> > > > on
>> > > >> > > > > a
>> > > >> > > > > > single window operator instance because of the data
>> skew. In
>> > > >> that
>> > > >> > > case
>> > > >> > > > > > stateful local aggregation would be probably a better
>> > choice.
>> > > >> > > > > >
>> > > >> > > > > > Because of that, I think we should eventually provide
>> both
>> > > >> versions
>> > > >> > > and
>> > > >> > > > > in
>> > > >> > > > > > the initial version we should at least make the “local
>> > > >> aggregation
>> > > >> > > > > engine”
>> > > >> > > > > > abstract enough, that one could easily provide different
>> > > >> > > implementation
>> > > >> > > > > > strategy.
>> > > >> > > > > >
>> > > >> > > > > > Piotrek
>> > > >> > > > > >
>> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com
>> >
>> > > >> wrote:
>> > > >> > > > > > >
>> > > >> > > > > > > Hi,
>> > > >> > > > > > >
>> > > >> > > > > > > For the trigger, it depends on what operator we want to
>> > use
>> > > >> under
>> > > >> > > the
>> > > >> > > > > > API.
>> > > >> > > > > > > If we choose to use window operator,
>> > > >> > > > > > > we should also use window's trigger. However, I also
>> think
>> > > >> reuse
>> > > >> > > > window
>> > > >> > > > > > > operator for this scenario may not be
>> > > >> > > > > > > the best choice. The reasons are the following:
>> > > >> > > > > > >
>> > > >> > > > > > > 1. As a lot of people already pointed out, window
>> relies
>> > > >> heavily
>> > > >> > on
>> > > >> > > > > state
>> > > >> > > > > > > and it will definitely effect performance. You can
>> > > >> > > > > > > argue that one can use heap based statebackend, but
>> this
>> > > will
>> > > >> > > > introduce
>> > > >> > > > > > > extra coupling. Especially we have a chance to
>> > > >> > > > > > > design a pure stateless operator.
>> > > >> > > > > > > 2. The window operator is *the most* complicated
>> operator
>> > > >> Flink
>> > > >> > > > > currently
>> > > >> > > > > > > have. Maybe we only need to pick a subset of
>> > > >> > > > > > > window operator to achieve the goal, but once the user
>> > wants
>> > > >> to
>> > > >> > > have
>> > > >> > > > a
>> > > >> > > > > > deep
>> > > >> > > > > > > look at the localAggregation operator, it's still
>> > > >> > > > > > > hard to find out what's going on under the window
>> > operator.
>> > > >> For
>> > > >> > > > > > simplicity,
>> > > >> > > > > > > I would also recommend we introduce a dedicated
>> > > >> > > > > > > lightweight operator, which also much easier for a
>> user to
>> > > >> learn
>> > > >> > > and
>> > > >> > > > > use.
>> > > >> > > > > > >
>> > > >> > > > > > > For your question about increasing the burden in
>> > > >> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only
>> > > thing
>> > > >> > this
>> > > >> > > > > > function
>> > > >> > > > > > > need
>> > > >> > > > > > > to do is output all the partial results, it's purely
>> cpu
>> > > >> > workload,
>> > > >> > > > not
>> > > >> > > > > > > introducing any IO. I want to point out that even if we
>> > have
>> > > >> this
>> > > >> > > > > > > cost, we reduced another barrier align cost of the
>> > operator,
>> > > >> > which
>> > > >> > > is
>> > > >> > > > > the
>> > > >> > > > > > > sync flush stage of the state, if you introduced state.
>> > This
>> > > >> > > > > > > flush actually will introduce disk IO, and I think it's
>> > > >> worthy to
>> > > >> > > > > > exchange
>> > > >> > > > > > > this cost with purely CPU workload. And we do have some
>> > > >> > > > > > > observations about these two behavior (as i said
>> before,
>> > we
>> > > >> > > actually
>> > > >> > > > > > > implemented both solutions), the stateless one actually
>> > > >> performs
>> > > >> > > > > > > better both in performance and barrier align time.
>> > > >> > > > > > >
>> > > >> > > > > > > Best,
>> > > >> > > > > > > Kurt
>> > > >> > > > > > >
>> > > >> > > > > > >
>> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
>> > > >> yanghua1...@gmail.com
>> > > >> > >
>> > > >> > > > > wrote:
>> > > >> > > > > > >
>> > > >> > > > > > >> Hi Kurt,
>> > > >> > > > > > >>
>> > > >> > > > > > >> Thanks for your example. Now, it looks more clearly
>> for
>> > me.
>> > > >> > > > > > >>
>> > > >> > > > > > >> From your example code snippet, I saw the
>> localAggregate
>> > > API
>> > > >> has
>> > > >> > > > three
>> > > >> > > > > > >> parameters:
>> > > >> > > > > > >>
>> > > >> > > > > > >>   1. key field
>> > > >> > > > > > >>   2. PartitionAvg
>> > > >> > > > > > >>   3. CountTrigger: Does this trigger comes from window
>> > > >> package?
>> > > >> > > > > > >>
>> > > >> > > > > > >> I will compare our and your design from API and
>> operator
>> > > >> level:
>> > > >> > > > > > >>
>> > > >> > > > > > >> *From the API level:*
>> > > >> > > > > > >>
>> > > >> > > > > > >> As I replied to @dianfu in the old email thread,[1]
>> the
>> > > >> Window
>> > > >> > API
>> > > >> > > > can
>> > > >> > > > > > >> provide the second and the third parameter right now.
>> > > >> > > > > > >>
>> > > >> > > > > > >> If you reuse specified interface or class, such as
>> > > *Trigger*
>> > > >> or
>> > > >> > > > > > >> *CounterTrigger* provided by window package, but do
>> not
>> > use
>> > > >> > window
>> > > >> > > > > API,
>> > > >> > > > > > >> it's not reasonable.
>> > > >> > > > > > >> And if you do not reuse these interface or class, you
>> > would
>> > > >> need
>> > > >> > > to
>> > > >> > > > > > >> introduce more things however they are looked similar
>> to
>> > > the
>> > > >> > > things
>> > > >> > > > > > >> provided by window package.
>> > > >> > > > > > >>
>> > > >> > > > > > >> The window package has provided several types of the
>> > window
>> > > >> and
>> > > >> > > many
>> > > >> > > > > > >> triggers and let users customize it. What's more, the
>> > user
>> > > is
>> > > >> > more
>> > > >> > > > > > familiar
>> > > >> > > > > > >> with Window API.
>> > > >> > > > > > >>
>> > > >> > > > > > >> This is the reason why we just provide localKeyBy API
>> and
>> > > >> reuse
>> > > >> > > the
>> > > >> > > > > > window
>> > > >> > > > > > >> API. It reduces unnecessary components such as
>> triggers
>> > and
>> > > >> the
>> > > >> > > > > > mechanism
>> > > >> > > > > > >> of buffer (based on count num or time).
>> > > >> > > > > > >> And it has a clear and easy to understand semantics.
>> > > >> > > > > > >>
>> > > >> > > > > > >> *From the operator level:*
>> > > >> > > > > > >>
>> > > >> > > > > > >> We reused window operator, so we can get all the
>> benefits
>> > > >> from
>> > > >> > > state
>> > > >> > > > > and
>> > > >> > > > > > >> checkpoint.
>> > > >> > > > > > >>
>> > > >> > > > > > >> From your design, you named the operator under
>> > > localAggregate
>> > > >> > API
>> > > >> > > > is a
>> > > >> > > > > > >> *stateless* operator. IMO, it is still a state, it is
>> > just
>> > > >> not
>> > > >> > > Flink
>> > > >> > > > > > >> managed state.
>> > > >> > > > > > >> About the memory buffer (I think it's still not very
>> > clear,
>> > > >> if
>> > > >> > you
>> > > >> > > > > have
>> > > >> > > > > > >> time, can you give more detail information or answer
>> my
>> > > >> > > questions),
>> > > >> > > > I
>> > > >> > > > > > have
>> > > >> > > > > > >> some questions:
>> > > >> > > > > > >>
>> > > >> > > > > > >>   - if it just a raw JVM heap memory buffer, how to
>> > support
>> > > >> > fault
>> > > >> > > > > > >>   tolerance, if the job is configured EXACTLY-ONCE
>> > semantic
>> > > >> > > > guarantee?
>> > > >> > > > > > >>   - if you thought the memory buffer(non-Flink state),
>> > has
>> > > >> > better
>> > > >> > > > > > >>   performance. In our design, users can also config
>> HEAP
>> > > >> state
>> > > >> > > > backend
>> > > >> > > > > > to
>> > > >> > > > > > >>   provide the performance close to your mechanism.
>> > > >> > > > > > >>   - `StreamOperator::prepareSnapshotPreBarrier()`
>> related
>> > > to
>> > > >> the
>> > > >> > > > > timing
>> > > >> > > > > > of
>> > > >> > > > > > >>   snapshot. IMO, the flush action should be a
>> > synchronized
>> > > >> > action?
>> > > >> > > > (if
>> > > >> > > > > > >> not,
>> > > >> > > > > > >>   please point out my mistake) I still think we should
>> > not
>> > > >> > depend
>> > > >> > > on
>> > > >> > > > > the
>> > > >> > > > > > >>   timing of checkpoint. Checkpoint related operations
>> are
>> > > >> > inherent
>> > > >> > > > > > >>   performance sensitive, we should not increase its
>> > burden
>> > > >> > > anymore.
>> > > >> > > > > Our
>> > > >> > > > > > >>   implementation based on the mechanism of Flink's
>> > > >> checkpoint,
>> > > >> > > which
>> > > >> > > > > can
>> > > >> > > > > > >>   benefit from the asnyc snapshot and incremental
>> > > checkpoint.
>> > > >> > IMO,
>> > > >> > > > the
>> > > >> > > > > > >>   performance is not a problem, and we also do not
>> find
>> > the
>> > > >> > > > > performance
>> > > >> > > > > > >> issue
>> > > >> > > > > > >>   in our production.
>> > > >> > > > > > >>
>> > > >> > > > > > >> [1]:
>> > > >> > > > > > >>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>> > > >> > > > > > >>
>> > > >> > > > > > >> Best,
>> > > >> > > > > > >> Vino
>> > > >> > > > > > >>
>> > > >> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道:
>> > > >> > > > > > >>
>> > > >> > > > > > >>> Yeah, sorry for not expressing myself clearly. I will
>> > try
>> > > to
>> > > >> > > > provide
>> > > >> > > > > > more
>> > > >> > > > > > >>> details to make sure we are on the same page.
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> For DataStream API, it shouldn't be optimized
>> > > automatically.
>> > > >> > You
>> > > >> > > > have
>> > > >> > > > > > to
>> > > >> > > > > > >>> explicitly call API to do local aggregation
>> > > >> > > > > > >>> as well as the trigger policy of the local
>> aggregation.
>> > > Take
>> > > >> > > > average
>> > > >> > > > > > for
>> > > >> > > > > > >>> example, the user program may look like this (just a
>> > > draft):
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> assuming the input type is DataStream<Tupl2<String,
>> > Int>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> ds.localAggregate(
>> > > >> > > > > > >>>        0,                                       //
>> The
>> > > local
>> > > >> > key,
>> > > >> > > > > which
>> > > >> > > > > > >> is
>> > > >> > > > > > >>> the String from Tuple2
>> > > >> > > > > > >>>        PartitionAvg(1),                 // The
>> partial
>> > > >> > > aggregation
>> > > >> > > > > > >>> function, produces Tuple2<Long, Int>, indicating sum
>> and
>> > > >> count
>> > > >> > > > > > >>>        CountTrigger.of(1000L)    // Trigger policy,
>> note
>> > > >> this
>> > > >> > > > should
>> > > >> > > > > be
>> > > >> > > > > > >>> best effort, and also be composited with time based
>> or
>> > > >> memory
>> > > >> > > size
>> > > >> > > > > > based
>> > > >> > > > > > >>> trigger
>> > > >> > > > > > >>>    )                                           // The
>> > > return
>> > > >> > type
>> > > >> > > > is
>> > > >> > > > > > >> local
>> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
>> > > >> > > > > > >>>    .keyBy(0)                             // Further
>> > keyby
>> > > it
>> > > >> > with
>> > > >> > > > > > >> required
>> > > >> > > > > > >>> key
>> > > >> > > > > > >>>    .aggregate(1)                      // This will
>> merge
>> > > all
>> > > >> > the
>> > > >> > > > > > partial
>> > > >> > > > > > >>> results and get the final average.
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> (This is only a draft, only trying to explain what it
>> > > looks
>> > > >> > > like. )
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> The local aggregate operator can be stateless, we can
>> > > keep a
>> > > >> > > memory
>> > > >> > > > > > >> buffer
>> > > >> > > > > > >>> or other efficient data structure to improve the
>> > aggregate
>> > > >> > > > > performance.
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> Let me know if you have any other questions.
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> Best,
>> > > >> > > > > > >>> Kurt
>> > > >> > > > > > >>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <
>> > > >> > yanghua1...@gmail.com
>> > > >> > > >
>> > > >> > > > > > wrote:
>> > > >> > > > > > >>>
>> > > >> > > > > > >>>> Hi Kurt,
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> Thanks for your reply.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> Actually, I am not against you to raise your design.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> From your description before, I just can imagine
>> your
>> > > >> > high-level
>> > > >> > > > > > >>>> implementation is about SQL and the optimization is
>> > inner
>> > > >> of
>> > > >> > the
>> > > >> > > > > API.
>> > > >> > > > > > >> Is
>> > > >> > > > > > >>> it
>> > > >> > > > > > >>>> automatically? how to give the configuration option
>> > about
>> > > >> > > trigger
>> > > >> > > > > > >>>> pre-aggregation?
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> Maybe after I get more information, it sounds more
>> > > >> reasonable.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> IMO, first of all, it would be better to make your
>> user
>> > > >> > > interface
>> > > >> > > > > > >>> concrete,
>> > > >> > > > > > >>>> it's the basis of the discussion.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> For example, can you give an example code snippet to
>> > > >> introduce
>> > > >> > > how
>> > > >> > > > > to
>> > > >> > > > > > >>> help
>> > > >> > > > > > >>>> users to process data skew caused by the jobs which
>> > built
>> > > >> with
>> > > >> > > > > > >> DataStream
>> > > >> > > > > > >>>> API?
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> If you give more details we can discuss further
>> more. I
>> > > >> think
>> > > >> > if
>> > > >> > > > one
>> > > >> > > > > > >>> design
>> > > >> > > > > > >>>> introduces an exact interface and another does not.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> The implementation has an obvious difference. For
>> > > example,
>> > > >> we
>> > > >> > > > > > introduce
>> > > >> > > > > > >>> an
>> > > >> > > > > > >>>> exact API in DataStream named localKeyBy, about the
>> > > >> > > > pre-aggregation
>> > > >> > > > > we
>> > > >> > > > > > >>> need
>> > > >> > > > > > >>>> to define the trigger mechanism of local
>> aggregation,
>> > so
>> > > we
>> > > >> > find
>> > > >> > > > > > reused
>> > > >> > > > > > >>>> window API and operator is a good choice. This is a
>> > > >> reasoning
>> > > >> > > link
>> > > >> > > > > > from
>> > > >> > > > > > >>>> design to implementation.
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> What do you think?
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> Best,
>> > > >> > > > > > >>>> Vino
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二
>> 上午11:58写道:
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>>> Hi Vino,
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> Now I feel that we may have different
>> understandings
>> > > about
>> > > >> > what
>> > > >> > > > > kind
>> > > >> > > > > > >> of
>> > > >> > > > > > >>>>> problems or improvements you want to
>> > > >> > > > > > >>>>> resolve. Currently, most of the feedback are
>> focusing
>> > on
>> > > >> *how
>> > > >> > > to
>> > > >> > > > > do a
>> > > >> > > > > > >>>>> proper local aggregation to improve performance
>> > > >> > > > > > >>>>> and maybe solving the data skew issue*. And my gut
>> > > >> feeling is
>> > > >> > > > this
>> > > >> > > > > is
>> > > >> > > > > > >>>>> exactly what users want at the first place,
>> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to summarize
>> here,
>> > > >> please
>> > > >> > > > > correct
>> > > >> > > > > > >>> me
>> > > >> > > > > > >>>> if
>> > > >> > > > > > >>>>> i'm wrong).
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> But I still think the design is somehow diverged
>> from
>> > > the
>> > > >> > goal.
>> > > >> > > > If
>> > > >> > > > > we
>> > > >> > > > > > >>>> want
>> > > >> > > > > > >>>>> to have an efficient and powerful way to
>> > > >> > > > > > >>>>> have local aggregation, supporting intermedia
>> result
>> > > type
>> > > >> is
>> > > >> > > > > > >> essential
>> > > >> > > > > > >>>> IMO.
>> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and
>> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper
>> > > >> support of
>> > > >> > > > > > >>>> intermediate
>> > > >> > > > > > >>>>> result type and can do `merge` operation
>> > > >> > > > > > >>>>> on them.
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> Now, we have a lightweight alternatives which
>> performs
>> > > >> well,
>> > > >> > > and
>> > > >> > > > > > >> have a
>> > > >> > > > > > >>>>> nice fit with the local aggregate requirements.
>> > > >> > > > > > >>>>> Mostly importantly,  it's much less complex because
>> > it's
>> > > >> > > > stateless.
>> > > >> > > > > > >> And
>> > > >> > > > > > >>>> it
>> > > >> > > > > > >>>>> can also achieve the similar multiple-aggregation
>> > > >> > > > > > >>>>> scenario.
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> I still not convinced why we shouldn't consider it
>> as
>> > a
>> > > >> first
>> > > >> > > > step.
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> Best,
>> > > >> > > > > > >>>>> Kurt
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <
>> > > >> > > > yanghua1...@gmail.com>
>> > > >> > > > > > >>>> wrote:
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>>> Hi Kurt,
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Thanks for your comments.
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> It seems we both implemented local aggregation
>> > feature
>> > > to
>> > > >> > > > optimize
>> > > >> > > > > > >>> the
>> > > >> > > > > > >>>>>> issue of data skew.
>> > > >> > > > > > >>>>>> However, IMHO, the API level of optimizing
>> revenue is
>> > > >> > > different.
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> *Your optimization benefits from Flink SQL and
>> it's
>> > not
>> > > >> > user's
>> > > >> > > > > > >>>> faces.(If
>> > > >> > > > > > >>>>> I
>> > > >> > > > > > >>>>>> understand it incorrectly, please correct this.)*
>> > > >> > > > > > >>>>>> *Our implementation employs it as an optimization
>> > tool
>> > > >> API
>> > > >> > for
>> > > >> > > > > > >>>>> DataStream,
>> > > >> > > > > > >>>>>> it just like a local version of the keyBy API.*
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Based on this, I want to say support it as a
>> > DataStream
>> > > >> API
>> > > >> > > can
>> > > >> > > > > > >>> provide
>> > > >> > > > > > >>>>>> these advantages:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>>   - The localKeyBy API has a clear semantic and
>> it's
>> > > >> > flexible
>> > > >> > > > not
>> > > >> > > > > > >>> only
>> > > >> > > > > > >>>>> for
>> > > >> > > > > > >>>>>>   processing data skew but also for implementing
>> some
>> > > >> user
>> > > >> > > > cases,
>> > > >> > > > > > >>> for
>> > > >> > > > > > >>>>>>   example, if we want to calculate the
>> multiple-level
>> > > >> > > > aggregation,
>> > > >> > > > > > >>> we
>> > > >> > > > > > >>>>> can
>> > > >> > > > > > >>>>>> do
>> > > >> > > > > > >>>>>>   multiple-level aggregation in the local
>> > aggregation:
>> > > >> > > > > > >>>>>>
>> > >  input.localKeyBy("a").sum(1).localKeyBy("b").window();
>> > > >> //
>> > > >> > > here
>> > > >> > > > > > >> "a"
>> > > >> > > > > > >>>> is
>> > > >> > > > > > >>>>> a
>> > > >> > > > > > >>>>>>   sub-category, while "b" is a category, here we
>> do
>> > not
>> > > >> need
>> > > >> > > to
>> > > >> > > > > > >>>> shuffle
>> > > >> > > > > > >>>>>> data
>> > > >> > > > > > >>>>>>   in the network.
>> > > >> > > > > > >>>>>>   - The users of DataStream API will benefit from
>> > this.
>> > > >> > > > Actually,
>> > > >> > > > > > >> we
>> > > >> > > > > > >>>>> have
>> > > >> > > > > > >>>>>>   a lot of scenes need to use DataStream API.
>> > > Currently,
>> > > >> > > > > > >> DataStream
>> > > >> > > > > > >>>> API
>> > > >> > > > > > >>>>> is
>> > > >> > > > > > >>>>>>   the cornerstone of the physical plan of Flink
>> SQL.
>> > > >> With a
>> > > >> > > > > > >>> localKeyBy
>> > > >> > > > > > >>>>>> API,
>> > > >> > > > > > >>>>>>   the optimization of SQL at least may use this
>> > > optimized
>> > > >> > API,
>> > > >> > > > > > >> this
>> > > >> > > > > > >>>> is a
>> > > >> > > > > > >>>>>>   further topic.
>> > > >> > > > > > >>>>>>   - Based on the window operator, our state would
>> > > benefit
>> > > >> > from
>> > > >> > > > > > >> Flink
>> > > >> > > > > > >>>>> State
>> > > >> > > > > > >>>>>>   and checkpoint, we do not need to worry about
>> OOM
>> > and
>> > > >> job
>> > > >> > > > > > >> failed.
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Now, about your questions:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> 1. About our design cannot change the data type
>> and
>> > > about
>> > > >> > the
>> > > >> > > > > > >>>>>> implementation of average:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy is an
>> API
>> > > >> > provides
>> > > >> > > > to
>> > > >> > > > > > >> the
>> > > >> > > > > > >>>>> users
>> > > >> > > > > > >>>>>> who use DataStream API to build their jobs.
>> > > >> > > > > > >>>>>> Users should know its semantics and the difference
>> > with
>> > > >> > keyBy
>> > > >> > > > API,
>> > > >> > > > > > >> so
>> > > >> > > > > > >>>> if
>> > > >> > > > > > >>>>>> they want to the average aggregation, they should
>> > carry
>> > > >> > local
>> > > >> > > > sum
>> > > >> > > > > > >>>> result
>> > > >> > > > > > >>>>>> and local count result.
>> > > >> > > > > > >>>>>> I admit that it will be convenient to use keyBy
>> > > directly.
>> > > >> > But
>> > > >> > > we
>> > > >> > > > > > >> need
>> > > >> > > > > > >>>> to
>> > > >> > > > > > >>>>>> pay a little price when we get some benefits. I
>> think
>> > > >> this
>> > > >> > > price
>> > > >> > > > > is
>> > > >> > > > > > >>>>>> reasonable. Considering that the DataStream API
>> > itself
>> > > >> is a
>> > > >> > > > > > >> low-level
>> > > >> > > > > > >>>> API
>> > > >> > > > > > >>>>>> (at least for now).
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> 2. About stateless operator and
>> > > >> > > > > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Actually, I have discussed this opinion with
>> @dianfu
>> > in
>> > > >> the
>> > > >> > > old
>> > > >> > > > > > >> mail
>> > > >> > > > > > >>>>>> thread. I will copy my opinion from there:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>>   - for your design, you still need somewhere to
>> give
>> > > the
>> > > >> > > users
>> > > >> > > > > > >>>>> configure
>> > > >> > > > > > >>>>>>   the trigger threshold (maybe memory
>> availability?),
>> > > >> this
>> > > >> > > > design
>> > > >> > > > > > >>>> cannot
>> > > >> > > > > > >>>>>>   guarantee a deterministic semantics (it will
>> bring
>> > > >> trouble
>> > > >> > > for
>> > > >> > > > > > >>>> testing
>> > > >> > > > > > >>>>>> and
>> > > >> > > > > > >>>>>>   debugging).
>> > > >> > > > > > >>>>>>   - if the implementation depends on the timing of
>> > > >> > checkpoint,
>> > > >> > > > it
>> > > >> > > > > > >>>> would
>> > > >> > > > > > >>>>>>   affect the checkpoint's progress, and the
>> buffered
>> > > data
>> > > >> > may
>> > > >> > > > > > >> cause
>> > > >> > > > > > >>>> OOM
>> > > >> > > > > > >>>>>>   issue. In addition, if the operator is
>> stateless,
>> > it
>> > > >> can
>> > > >> > not
>> > > >> > > > > > >>> provide
>> > > >> > > > > > >>>>>> fault
>> > > >> > > > > > >>>>>>   tolerance.
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Best,
>> > > >> > > > > > >>>>>> Vino
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二
>> > 上午9:22写道:
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>>> Hi Vino,
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the general idea
>> and
>> > > IMO
>> > > >> > it's
>> > > >> > > > > > >> very
>> > > >> > > > > > >>>>> useful
>> > > >> > > > > > >>>>>>> feature.
>> > > >> > > > > > >>>>>>> But after reading through the document, I feel
>> that
>> > we
>> > > >> may
>> > > >> > > over
>> > > >> > > > > > >>>> design
>> > > >> > > > > > >>>>>> the
>> > > >> > > > > > >>>>>>> required
>> > > >> > > > > > >>>>>>> operator for proper local aggregation. The main
>> > reason
>> > > >> is
>> > > >> > we
>> > > >> > > > want
>> > > >> > > > > > >>> to
>> > > >> > > > > > >>>>>> have a
>> > > >> > > > > > >>>>>>> clear definition and behavior about the "local
>> keyed
>> > > >> state"
>> > > >> > > > which
>> > > >> > > > > > >>> in
>> > > >> > > > > > >>>> my
>> > > >> > > > > > >>>>>>> opinion is not
>> > > >> > > > > > >>>>>>> necessary for local aggregation, at least for
>> start.
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> Another issue I noticed is the local key by
>> operator
>> > > >> cannot
>> > > >> > > > > > >> change
>> > > >> > > > > > >>>>>> element
>> > > >> > > > > > >>>>>>> type, it will
>> > > >> > > > > > >>>>>>> also restrict a lot of use cases which can be
>> > benefit
>> > > >> from
>> > > >> > > > local
>> > > >> > > > > > >>>>>>> aggregation, like "average".
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> We also did similar logic in SQL and the only
>> thing
>> > > >> need to
>> > > >> > > be
>> > > >> > > > > > >> done
>> > > >> > > > > > >>>> is
>> > > >> > > > > > >>>>>>> introduce
>> > > >> > > > > > >>>>>>> a stateless lightweight operator which is
>> *chained*
>> > > >> before
>> > > >> > > > > > >>> `keyby()`.
>> > > >> > > > > > >>>>> The
>> > > >> > > > > > >>>>>>> operator will flush all buffered
>> > > >> > > > > > >>>>>>> elements during
>> > > >> > `StreamOperator::prepareSnapshotPreBarrier()`
>> > > >> > > > and
>> > > >> > > > > > >>>> make
>> > > >> > > > > > >>>>>>> himself stateless.
>> > > >> > > > > > >>>>>>> By the way, in the earlier version we also did
>> the
>> > > >> similar
>> > > >> > > > > > >> approach
>> > > >> > > > > > >>>> by
>> > > >> > > > > > >>>>>>> introducing a stateful
>> > > >> > > > > > >>>>>>> local aggregation operator but it's not
>> performed as
>> > > >> well
>> > > >> > as
>> > > >> > > > the
>> > > >> > > > > > >>>> later
>> > > >> > > > > > >>>>>> one,
>> > > >> > > > > > >>>>>>> and also effect the barrie
>> > > >> > > > > > >>>>>>> alignment time. The later one is fairly simple
>> and
>> > > more
>> > > >> > > > > > >> efficient.
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> I would highly suggest you to consider to have a
>> > > >> stateless
>> > > >> > > > > > >> approach
>> > > >> > > > > > >>>> at
>> > > >> > > > > > >>>>>> the
>> > > >> > > > > > >>>>>>> first step.
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> Best,
>> > > >> > > > > > >>>>>>> Kurt
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <
>> > > >> imj...@gmail.com>
>> > > >> > > > > > >> wrote:
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>>> Hi Vino,
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>> Thanks for the proposal.
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs
>> > > >> > > > > > >>>>>>>>
>> > > >> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
>> > > >> > > > > > >> have
>> > > >> > > > > > >>>> you
>> > > >> > > > > > >>>>>>> done
>> > > >> > > > > > >>>>>>>> some benchmark?
>> > > >> > > > > > >>>>>>>> Because I'm curious about how much performance
>> > > >> improvement
>> > > >> > > can
>> > > >> > > > > > >> we
>> > > >> > > > > > >>>> get
>> > > >> > > > > > >>>>>> by
>> > > >> > > > > > >>>>>>>> using count window as the local operator.
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>> Best,
>> > > >> > > > > > >>>>>>>> Jark
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <
>> > > >> > > > yanghua1...@gmail.com
>> > > >> > > > > > >>>
>> > > >> > > > > > >>>>> wrote:
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>>> Hi Hequn,
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> Thanks for your reply.
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to provide a
>> tool
>> > > >> which
>> > > >> > > can
>> > > >> > > > > > >>> let
>> > > >> > > > > > >>>>>> users
>> > > >> > > > > > >>>>>>> do
>> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The behavior of
>> the
>> > > >> > > > > > >>> pre-aggregation
>> > > >> > > > > > >>>>> is
>> > > >> > > > > > >>>>>>>>> similar to keyBy API.
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> So the three cases are different, I will
>> describe
>> > > them
>> > > >> > one
>> > > >> > > by
>> > > >> > > > > > >>>> one:
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> *In this case, the result is event-driven, each
>> > > event
>> > > >> can
>> > > >> > > > > > >>> produce
>> > > >> > > > > > >>>>> one
>> > > >> > > > > > >>>>>>> sum
>> > > >> > > > > > >>>>>>>>> aggregation result and it is the latest one
>> from
>> > the
>> > > >> > source
>> > > >> > > > > > >>>> start.*
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> *In this case, the semantic may have a
>> problem, it
>> > > >> would
>> > > >> > do
>> > > >> > > > > > >> the
>> > > >> > > > > > >>>>> local
>> > > >> > > > > > >>>>>>> sum
>> > > >> > > > > > >>>>>>>>> aggregation and will produce the latest partial
>> > > result
>> > > >> > from
>> > > >> > > > > > >> the
>> > > >> > > > > > >>>>>> source
>> > > >> > > > > > >>>>>>>>> start for every event. *
>> > > >> > > > > > >>>>>>>>> *These latest partial results from the same key
>> > are
>> > > >> > hashed
>> > > >> > > to
>> > > >> > > > > > >>> one
>> > > >> > > > > > >>>>>> node
>> > > >> > > > > > >>>>>>> to
>> > > >> > > > > > >>>>>>>>> do the global sum aggregation.*
>> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it received
>> > > multiple
>> > > >> > > partial
>> > > >> > > > > > >>>>> results
>> > > >> > > > > > >>>>>>>> (they
>> > > >> > > > > > >>>>>>>>> are all calculated from the source start) and
>> sum
>> > > them
>> > > >> > will
>> > > >> > > > > > >> get
>> > > >> > > > > > >>>> the
>> > > >> > > > > > >>>>>>> wrong
>> > > >> > > > > > >>>>>>>>> result.*
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> 3.
>> > > >> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> *In this case, it would just get a partial
>> > > aggregation
>> > > >> > > result
>> > > >> > > > > > >>> for
>> > > >> > > > > > >>>>>> the 5
>> > > >> > > > > > >>>>>>>>> records in the count window. The partial
>> > aggregation
>> > > >> > > results
>> > > >> > > > > > >>> from
>> > > >> > > > > > >>>>> the
>> > > >> > > > > > >>>>>>>> same
>> > > >> > > > > > >>>>>>>>> key will be aggregated globally.*
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> So the first case and the third case can get
>> the
>> > > >> *same*
>> > > >> > > > > > >> result,
>> > > >> > > > > > >>>> the
>> > > >> > > > > > >>>>>>>>> difference is the output-style and the latency.
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> Generally speaking, the local key API is just
>> an
>> > > >> > > optimization
>> > > >> > > > > > >>>> API.
>> > > >> > > > > > >>>>> We
>> > > >> > > > > > >>>>>>> do
>> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the user has to
>> > > >> > understand
>> > > >> > > > > > >> its
>> > > >> > > > > > >>>>>>> semantics
>> > > >> > > > > > >>>>>>>>> and use it correctly.
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> Best,
>> > > >> > > > > > >>>>>>>>> Vino
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com>
>> 于2019年6月17日周一
>> > > >> > 下午4:18写道:
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> Hi Vino,
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a very
>> > good
>> > > >> > > feature!
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is the semantics
>> > for
>> > > >> the
>> > > >> > > > > > >>>>>> `localKeyBy`.
>> > > >> > > > > > >>>>>>>> From
>> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API returns an
>> > > >> instance
>> > > >> > of
>> > > >> > > > > > >>>>>>> `KeyedStream`
>> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so in this case,
>> > > what's
>> > > >> > the
>> > > >> > > > > > >>>>> semantics
>> > > >> > > > > > >>>>>>> for
>> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the
>> following
>> > > code
>> > > >> > share
>> > > >> > > > > > >>> the
>> > > >> > > > > > >>>>> same
>> > > >> > > > > > >>>>>>>>> result?
>> > > >> > > > > > >>>>>>>>>> and what're the differences between them?
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>> 3.
>> > > >> > > > > > >>
>> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> Would also be great if we can add this into
>> the
>> > > >> > document.
>> > > >> > > > > > >>> Thank
>> > > >> > > > > > >>>>> you
>> > > >> > > > > > >>>>>>>> very
>> > > >> > > > > > >>>>>>>>>> much.
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> Best, Hequn
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang <
>> > > >> > > > > > >>>>> yanghua1...@gmail.com>
>> > > >> > > > > > >>>>>>>>> wrote:
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha,
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" section of
>> FLIP
>> > > >> wiki
>> > > >> > > > > > >>>> page.[1]
>> > > >> > > > > > >>>>>> This
>> > > >> > > > > > >>>>>>>>> mail
>> > > >> > > > > > >>>>>>>>>>> thread indicates that it has proceeded to the
>> > > third
>> > > >> > step.
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote step),
>> I
>> > > >> didn't
>> > > >> > > > > > >> find
>> > > >> > > > > > >>>> the
>> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the voting
>> process.
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> Considering that the discussion of this
>> feature
>> > > has
>> > > >> > been
>> > > >> > > > > > >>> done
>> > > >> > > > > > >>>>> in
>> > > >> > > > > > >>>>>>> the
>> > > >> > > > > > >>>>>>>>> old
>> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when should I
>> > start
>> > > >> > > > > > >> voting?
>> > > >> > > > > > >>>> Can
>> > > >> > > > > > >>>>> I
>> > > >> > > > > > >>>>>>>> start
>> > > >> > > > > > >>>>>>>>>> now?
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> Best,
>> > > >> > > > > > >>>>>>>>>>> Vino
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> [1]:
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
>> > > >> > > > > > >>>>>>>>>>> [2]:
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四
>> > > 上午9:19写道:
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your
>> efforts.
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>> Best,
>> > > >> > > > > > >>>>>>>>>>>> Leesf
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com>
>> > 于2019年6月12日周三
>> > > >> > > > > > >>> 下午5:46写道:
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Hi folks,
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP discussion
>> > thread
>> > > >> > > > > > >> about
>> > > >> > > > > > >>>>>>> supporting
>> > > >> > > > > > >>>>>>>>>> local
>> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink.
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can effectively
>> > alleviate
>> > > >> data
>> > > >> > > > > > >>>> skew.
>> > > >> > > > > > >>>>>>> This
>> > > >> > > > > > >>>>>>>> is
>> > > >> > > > > > >>>>>>>>>> the
>> > > >> > > > > > >>>>>>>>>>>>> FLIP:
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely used to
>> > > >> perform
>> > > >> > > > > > >>>>>> aggregating
>> > > >> > > > > > >>>>>>>>>>>> operations
>> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the
>> elements
>> > > >> that
>> > > >> > > > > > >>> have
>> > > >> > > > > > >>>>> the
>> > > >> > > > > > >>>>>>> same
>> > > >> > > > > > >>>>>>>>>> key.
>> > > >> > > > > > >>>>>>>>>>>> When
>> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the elements with the
>> > same
>> > > >> key
>> > > >> > > > > > >>> will
>> > > >> > > > > > >>>> be
>> > > >> > > > > > >>>>>>> sent
>> > > >> > > > > > >>>>>>>> to
>> > > >> > > > > > >>>>>>>>>> and
>> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task.
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> The performance of these aggregating
>> > operations
>> > > is
>> > > >> > > > > > >> very
>> > > >> > > > > > >>>>>>> sensitive
>> > > >> > > > > > >>>>>>>>> to
>> > > >> > > > > > >>>>>>>>>>> the
>> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases where
>> the
>> > > >> > > > > > >>> distribution
>> > > >> > > > > > >>>>> of
>> > > >> > > > > > >>>>>>> keys
>> > > >> > > > > > >>>>>>>>>>>> follows a
>> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance will be
>> > > >> significantly
>> > > >> > > > > > >>>>>> downgraded.
>> > > >> > > > > > >>>>>>>>> More
>> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of
>> > parallelism
>> > > >> does
>> > > >> > > > > > >>> not
>> > > >> > > > > > >>>>> help
>> > > >> > > > > > >>>>>>>> when
>> > > >> > > > > > >>>>>>>>> a
>> > > >> > > > > > >>>>>>>>>>> task
>> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted
>> method
>> > to
>> > > >> > > > > > >> reduce
>> > > >> > > > > > >>>> the
>> > > >> > > > > > >>>>>>>>>> performance
>> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can decompose the
>> > > >> > > > > > >> aggregating
>> > > >> > > > > > >>>>>>>> operations
>> > > >> > > > > > >>>>>>>>>> into
>> > > >> > > > > > >>>>>>>>>>>> two
>> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we aggregate
>> the
>> > > >> elements
>> > > >> > > > > > >>> of
>> > > >> > > > > > >>>>> the
>> > > >> > > > > > >>>>>>> same
>> > > >> > > > > > >>>>>>>>> key
>> > > >> > > > > > >>>>>>>>>>> at
>> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial results.
>> > Then
>> > > at
>> > > >> > > > > > >> the
>> > > >> > > > > > >>>>> second
>> > > >> > > > > > >>>>>>>>> phase,
>> > > >> > > > > > >>>>>>>>>>>> these
>> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to receivers
>> > according
>> > > to
>> > > >> > > > > > >>> their
>> > > >> > > > > > >>>>> keys
>> > > >> > > > > > >>>>>>> and
>> > > >> > > > > > >>>>>>>>> are
>> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final result. Since
>> the
>> > > >> number
>> > > >> > > > > > >>> of
>> > > >> > > > > > >>>>>>> partial
>> > > >> > > > > > >>>>>>>>>>> results
>> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is limited by the
>> > > >> number of
>> > > >> > > > > > >>>>>> senders,
>> > > >> > > > > > >>>>>>>> the
>> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be reduced.
>> > > >> Besides, by
>> > > >> > > > > > >>>>>> reducing
>> > > >> > > > > > >>>>>>>> the
>> > > >> > > > > > >>>>>>>>>>> amount
>> > > >> > > > > > >>>>>>>>>>>>> of transferred data the performance can be
>> > > further
>> > > >> > > > > > >>>>> improved.
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> *More details*:
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Design documentation:
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread:
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
>> > > >> > > > > > >>>>>>>>>
>> https://issues.apache.org/jira/browse/FLINK-12786
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your feedback!
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>> Best,
>> > > >> > > > > > >>>>>>>>>>>>> Vino
>> > > >> > > > > > >>>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>>
>> > > >> > > > > > >>>>>>>>>
>> > > >> > > > > > >>>>>>>>
>> > > >> > > > > > >>>>>>>
>> > > >> > > > > > >>>>>>
>> > > >> > > > > > >>>>>
>> > > >> > > > > > >>>>
>> > > >> > > > > > >>>
>> > > >> > > > > > >>
>> > > >> > > > > >
>> > > >> > > > > >
>> > > >> > > > >
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>

Reply via email to