Hi Kurt,

Thanks for your reply.

If you do not depend on the window operator, that means you need to provide
many Trigger related implementations like window operator.
What's more, you worry about the complexity of the window operator but
ignore the flexible which window operator provided for the business logic.

I assume that we introduce a LocalAggregateOperator, we need these
functions:

   - we need a timer to trigger local aggregation, we should introduce
   Flink's timer service;
   - some aggregation depend on time, we may also need to provide a
   mechanism like ProcessWindowFunction;

Anyway, If we need to give a complete implementation and consider
flexibility it would look like a window operator finally. Unless you do not
support these features.

The window operator used Flink's state-related logic may make you feel it
is heavily.

However, based on your design, the state is in memory may cause these
problems:


   1. your buffer is in memory, how to avoid OOM? With Flink state, we need
   not consider this problem;
   2. when the checkpoint interval is short and the volume of data is
   large, I think the buffer flush action will also cause performance issue;
   3. `StreamOperator::prepareSnapshotPreBarrier()` may not purely CPU
   workload, actually, it depends on the downstream operator, if an operator
   which send remote requests and chained with LocalAggregateOperator, the
   workload will very large unless we don't allow it to follow with other
   operators, but obviously, it is not reasonable. I just want to say that
   depending on the timing of checkpoint has the risk to slow down its
   performance. However, our design does not change anything of the
   state/checkpointing/operator, we can get all the benefit from any further
   optimization about them;

 I admit use window operator and Flink's state may look a little
complexity, but it's stable, flexible and long-tested.
However, the lightweight operator and localAggregate API may scene
specific(just like I provide these examples above). If it leaves these
specific scenes, the benefit will be lost.

Best,
Vino



Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午5:46写道:

> Hi,
>
> For the trigger, it depends on what operator we want to use under the API.
> If we choose to use window operator,
> we should also use window's trigger. However, I also think reuse window
> operator for this scenario may not be
> the best choice. The reasons are the following:
>
> 1. As a lot of people already pointed out, window relies heavily on state
> and it will definitely effect performance. You can
>  argue that one can use heap based statebackend, but this will introduce
> extra coupling. Especially we have a chance to
> design a pure stateless operator.
> 2. The window operator is *the most* complicated operator Flink currently
> have. Maybe we only need to pick a subset of
> window operator to achieve the goal, but once the user wants to have a deep
> look at the localAggregation operator, it's still
> hard to find out what's going on under the window operator. For simplicity,
> I would also recommend we introduce a dedicated
> lightweight operator, which also much easier for a user to learn and use.
>
> For your question about increasing the burden in
> `StreamOperator::prepareSnapshotPreBarrier()`, the only thing this function
> need
> to do is output all the partial results, it's purely cpu workload, not
> introducing any IO. I want to point out that even if we have this
> cost, we reduced another barrier align cost of the operator, which is the
> sync flush stage of the state, if you introduced state. This
> flush actually will introduce disk IO, and I think it's worthy to exchange
> this cost with purely CPU workload. And we do have some
> observations about these two behavior (as i said before, we actually
> implemented both solutions), the stateless one actually performs
> better both in performance and barrier align time.
>
> Best,
> Kurt
>
>
> On Tue, Jun 18, 2019 at 3:40 PM vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Kurt,
> >
> > Thanks for your example. Now, it looks more clearly for me.
> >
> > From your example code snippet, I saw the localAggregate API has three
> > parameters:
> >
> >    1. key field
> >    2. PartitionAvg
> >    3. CountTrigger: Does this trigger comes from window package?
> >
> > I will compare our and your design from API and operator level:
> >
> > *From the API level:*
> >
> > As I replied to @dianfu in the old email thread,[1] the Window API can
> > provide the second and the third parameter right now.
> >
> > If you reuse specified interface or class, such as *Trigger* or
> > *CounterTrigger* provided by window package, but do not use window API,
> > it's not reasonable.
> > And if you do not reuse these interface or class, you would need to
> > introduce more things however they are looked similar to the things
> > provided by window package.
> >
> > The window package has provided several types of the window and many
> > triggers and let users customize it. What's more, the user is more
> familiar
> > with Window API.
> >
> > This is the reason why we just provide localKeyBy API and reuse the
> window
> > API. It reduces unnecessary components such as triggers and the mechanism
> > of buffer (based on count num or time).
> > And it has a clear and easy to understand semantics.
> >
> > *From the operator level:*
> >
> > We reused window operator, so we can get all the benefits from state and
> > checkpoint.
> >
> > From your design, you named the operator under localAggregate API is a
> > *stateless* operator. IMO, it is still a state, it is just not Flink
> > managed state.
> > About the memory buffer (I think it's still not very clear, if you have
> > time, can you give more detail information or answer my questions), I
> have
> > some questions:
> >
> >    - if it just a raw JVM heap memory buffer, how to support fault
> >    tolerance, if the job is configured EXACTLY-ONCE semantic guarantee?
> >    - if you thought the memory buffer(non-Flink state), has better
> >    performance. In our design, users can also config HEAP state backend
> to
> >    provide the performance close to your mechanism.
> >    - `StreamOperator::prepareSnapshotPreBarrier()` related to the timing
> of
> >    snapshot. IMO, the flush action should be a synchronized action? (if
> > not,
> >    please point out my mistake) I still think we should not depend on the
> >    timing of checkpoint. Checkpoint related operations are inherent
> >    performance sensitive, we should not increase its burden anymore. Our
> >    implementation based on the mechanism of Flink's checkpoint, which can
> >    benefit from the asnyc snapshot and incremental checkpoint. IMO, the
> >    performance is not a problem, and we also do not find the performance
> > issue
> >    in our production.
> >
> > [1]:
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> >
> > Best,
> > Vino
> >
> > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道:
> >
> > > Yeah, sorry for not expressing myself clearly. I will try to provide
> more
> > > details to make sure we are on the same page.
> > >
> > > For DataStream API, it shouldn't be optimized automatically. You have
> to
> > > explicitly call API to do local aggregation
> > > as well as the trigger policy of the local aggregation. Take average
> for
> > > example, the user program may look like this (just a draft):
> > >
> > > assuming the input type is DataStream<Tupl2<String, Int>>
> > >
> > > ds.localAggregate(
> > >         0,                                       // The local key,
> which
> > is
> > > the String from Tuple2
> > >         PartitionAvg(1),                 // The partial aggregation
> > > function, produces Tuple2<Long, Int>, indicating sum and count
> > >         CountTrigger.of(1000L)    // Trigger policy, note this should
> be
> > > best effort, and also be composited with time based or memory size
> based
> > > trigger
> > >     )                                           // The return type is
> > local
> > > aggregate Tuple2<String, Tupl2<Long, Int>>
> > >     .keyBy(0)                             // Further keyby it with
> > required
> > > key
> > >     .aggregate(1)                      // This will merge all the
> partial
> > > results and get the final average.
> > >
> > > (This is only a draft, only trying to explain what it looks like. )
> > >
> > > The local aggregate operator can be stateless, we can keep a memory
> > buffer
> > > or other efficient data structure to improve the aggregate performance.
> > >
> > > Let me know if you have any other questions.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Tue, Jun 18, 2019 at 1:29 PM vino yang <yanghua1...@gmail.com>
> wrote:
> > >
> > > > Hi Kurt,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > Actually, I am not against you to raise your design.
> > > >
> > > > From your description before, I just can imagine your high-level
> > > > implementation is about SQL and the optimization is inner of the API.
> > Is
> > > it
> > > > automatically? how to give the configuration option about trigger
> > > > pre-aggregation?
> > > >
> > > > Maybe after I get more information, it sounds more reasonable.
> > > >
> > > > IMO, first of all, it would be better to make your user interface
> > > concrete,
> > > > it's the basis of the discussion.
> > > >
> > > > For example, can you give an example code snippet to introduce how to
> > > help
> > > > users to process data skew caused by the jobs which built with
> > DataStream
> > > > API?
> > > >
> > > > If you give more details we can discuss further more. I think if one
> > > design
> > > > introduces an exact interface and another does not.
> > > >
> > > > The implementation has an obvious difference. For example, we
> introduce
> > > an
> > > > exact API in DataStream named localKeyBy, about the pre-aggregation
> we
> > > need
> > > > to define the trigger mechanism of local aggregation, so we find
> reused
> > > > window API and operator is a good choice. This is a reasoning link
> from
> > > > design to implementation.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > >
> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道:
> > > >
> > > > > Hi Vino,
> > > > >
> > > > > Now I feel that we may have different understandings about what
> kind
> > of
> > > > > problems or improvements you want to
> > > > > resolve. Currently, most of the feedback are focusing on *how to
> do a
> > > > > proper local aggregation to improve performance
> > > > > and maybe solving the data skew issue*. And my gut feeling is this
> is
> > > > > exactly what users want at the first place,
> > > > > especially those +1s. (Sorry to try to summarize here, please
> correct
> > > me
> > > > if
> > > > > i'm wrong).
> > > > >
> > > > > But I still think the design is somehow diverged from the goal. If
> we
> > > > want
> > > > > to have an efficient and powerful way to
> > > > > have local aggregation, supporting intermedia result type is
> > essential
> > > > IMO.
> > > > > Both runtime's `AggregateFunction` and
> > > > > SQL`s `UserDefinedAggregateFunction` have a proper support of
> > > > intermediate
> > > > > result type and can do `merge` operation
> > > > > on them.
> > > > >
> > > > > Now, we have a lightweight alternatives which performs well, and
> > have a
> > > > > nice fit with the local aggregate requirements.
> > > > > Mostly importantly,  it's much less complex because it's stateless.
> > And
> > > > it
> > > > > can also achieve the similar multiple-aggregation
> > > > > scenario.
> > > > >
> > > > > I still not convinced why we shouldn't consider it as a first step.
> > > > >
> > > > > Best,
> > > > > Kurt
> > > > >
> > > > >
> > > > > On Tue, Jun 18, 2019 at 11:35 AM vino yang <yanghua1...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Kurt,
> > > > > >
> > > > > > Thanks for your comments.
> > > > > >
> > > > > > It seems we both implemented local aggregation feature to
> optimize
> > > the
> > > > > > issue of data skew.
> > > > > > However, IMHO, the API level of optimizing revenue is different.
> > > > > >
> > > > > > *Your optimization benefits from Flink SQL and it's not user's
> > > > faces.(If
> > > > > I
> > > > > > understand it incorrectly, please correct this.)*
> > > > > > *Our implementation employs it as an optimization tool API for
> > > > > DataStream,
> > > > > > it just like a local version of the keyBy API.*
> > > > > >
> > > > > > Based on this, I want to say support it as a DataStream API can
> > > provide
> > > > > > these advantages:
> > > > > >
> > > > > >
> > > > > >    - The localKeyBy API has a clear semantic and it's flexible
> not
> > > only
> > > > > for
> > > > > >    processing data skew but also for implementing some user
> cases,
> > > for
> > > > > >    example, if we want to calculate the multiple-level
> aggregation,
> > > we
> > > > > can
> > > > > > do
> > > > > >    multiple-level aggregation in the local aggregation:
> > > > > >    input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here
> > "a"
> > > > is
> > > > > a
> > > > > >    sub-category, while "b" is a category, here we do not need to
> > > > shuffle
> > > > > > data
> > > > > >    in the network.
> > > > > >    - The users of DataStream API will benefit from this.
> Actually,
> > we
> > > > > have
> > > > > >    a lot of scenes need to use DataStream API. Currently,
> > DataStream
> > > > API
> > > > > is
> > > > > >    the cornerstone of the physical plan of Flink SQL. With a
> > > localKeyBy
> > > > > > API,
> > > > > >    the optimization of SQL at least may use this optimized API,
> > this
> > > > is a
> > > > > >    further topic.
> > > > > >    - Based on the window operator, our state would benefit from
> > Flink
> > > > > State
> > > > > >    and checkpoint, we do not need to worry about OOM and job
> > failed.
> > > > > >
> > > > > > Now, about your questions:
> > > > > >
> > > > > > 1. About our design cannot change the data type and about the
> > > > > > implementation of average:
> > > > > >
> > > > > > Just like my reply to Hequn, the localKeyBy is an API provides to
> > the
> > > > > users
> > > > > > who use DataStream API to build their jobs.
> > > > > > Users should know its semantics and the difference with keyBy
> API,
> > so
> > > > if
> > > > > > they want to the average aggregation, they should carry local sum
> > > > result
> > > > > > and local count result.
> > > > > > I admit that it will be convenient to use keyBy directly. But we
> > need
> > > > to
> > > > > > pay a little price when we get some benefits. I think this price
> is
> > > > > > reasonable. Considering that the DataStream API itself is a
> > low-level
> > > > API
> > > > > > (at least for now).
> > > > > >
> > > > > > 2. About stateless operator and
> > > > > > `StreamOperator::prepareSnapshotPreBarrier()`:
> > > > > >
> > > > > > Actually, I have discussed this opinion with @dianfu in the old
> > mail
> > > > > > thread. I will copy my opinion from there:
> > > > > >
> > > > > >    - for your design, you still need somewhere to give the users
> > > > > configure
> > > > > >    the trigger threshold (maybe memory availability?), this
> design
> > > > cannot
> > > > > >    guarantee a deterministic semantics (it will bring trouble for
> > > > testing
> > > > > > and
> > > > > >    debugging).
> > > > > >    - if the implementation depends on the timing of checkpoint,
> it
> > > > would
> > > > > >    affect the checkpoint's progress, and the buffered data may
> > cause
> > > > OOM
> > > > > >    issue. In addition, if the operator is stateless, it can not
> > > provide
> > > > > > fault
> > > > > >    tolerance.
> > > > > >
> > > > > > Best,
> > > > > > Vino
> > > > > >
> > > > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道:
> > > > > >
> > > > > > > Hi Vino,
> > > > > > >
> > > > > > > Thanks for the proposal, I like the general idea and IMO it's
> > very
> > > > > useful
> > > > > > > feature.
> > > > > > > But after reading through the document, I feel that we may over
> > > > design
> > > > > > the
> > > > > > > required
> > > > > > > operator for proper local aggregation. The main reason is we
> want
> > > to
> > > > > > have a
> > > > > > > clear definition and behavior about the "local keyed state"
> which
> > > in
> > > > my
> > > > > > > opinion is not
> > > > > > > necessary for local aggregation, at least for start.
> > > > > > >
> > > > > > > Another issue I noticed is the local key by operator cannot
> > change
> > > > > > element
> > > > > > > type, it will
> > > > > > > also restrict a lot of use cases which can be benefit from
> local
> > > > > > > aggregation, like "average".
> > > > > > >
> > > > > > > We also did similar logic in SQL and the only thing need to be
> > done
> > > > is
> > > > > > > introduce
> > > > > > > a stateless lightweight operator which is *chained* before
> > > `keyby()`.
> > > > > The
> > > > > > > operator will flush all buffered
> > > > > > > elements during `StreamOperator::prepareSnapshotPreBarrier()`
> and
> > > > make
> > > > > > > himself stateless.
> > > > > > > By the way, in the earlier version we also did the similar
> > approach
> > > > by
> > > > > > > introducing a stateful
> > > > > > > local aggregation operator but it's not performed as well as
> the
> > > > later
> > > > > > one,
> > > > > > > and also effect the barrie
> > > > > > > alignment time. The later one is fairly simple and more
> > efficient.
> > > > > > >
> > > > > > > I would highly suggest you to consider to have a stateless
> > approach
> > > > at
> > > > > > the
> > > > > > > first step.
> > > > > > >
> > > > > > > Best,
> > > > > > > Kurt
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > Hi Vino,
> > > > > > > >
> > > > > > > > Thanks for the proposal.
> > > > > > > >
> > > > > > > > Regarding to the "input.keyBy(0).sum(1)" vs
> > > > > > > > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
> > have
> > > > you
> > > > > > > done
> > > > > > > > some benchmark?
> > > > > > > > Because I'm curious about how much performance improvement
> can
> > we
> > > > get
> > > > > > by
> > > > > > > > using count window as the local operator.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jark
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, 17 Jun 2019 at 17:48, vino yang <
> yanghua1...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Hequn,
> > > > > > > > >
> > > > > > > > > Thanks for your reply.
> > > > > > > > >
> > > > > > > > > The purpose of localKeyBy API is to provide a tool which
> can
> > > let
> > > > > > users
> > > > > > > do
> > > > > > > > > pre-aggregation in the local. The behavior of the
> > > pre-aggregation
> > > > > is
> > > > > > > > > similar to keyBy API.
> > > > > > > > >
> > > > > > > > > So the three cases are different, I will describe them one
> by
> > > > one:
> > > > > > > > >
> > > > > > > > > 1. input.keyBy(0).sum(1)
> > > > > > > > >
> > > > > > > > > *In this case, the result is event-driven, each event can
> > > produce
> > > > > one
> > > > > > > sum
> > > > > > > > > aggregation result and it is the latest one from the source
> > > > start.*
> > > > > > > > >
> > > > > > > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > > > > >
> > > > > > > > > *In this case, the semantic may have a problem, it would do
> > the
> > > > > local
> > > > > > > sum
> > > > > > > > > aggregation and will produce the latest partial result from
> > the
> > > > > > source
> > > > > > > > > start for every event. *
> > > > > > > > > *These latest partial results from the same key are hashed
> to
> > > one
> > > > > > node
> > > > > > > to
> > > > > > > > > do the global sum aggregation.*
> > > > > > > > > *In the global aggregation, when it received multiple
> partial
> > > > > results
> > > > > > > > (they
> > > > > > > > > are all calculated from the source start) and sum them will
> > get
> > > > the
> > > > > > > wrong
> > > > > > > > > result.*
> > > > > > > > >
> > > > > > > > > 3.
> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > > > > >
> > > > > > > > > *In this case, it would just get a partial aggregation
> result
> > > for
> > > > > > the 5
> > > > > > > > > records in the count window. The partial aggregation
> results
> > > from
> > > > > the
> > > > > > > > same
> > > > > > > > > key will be aggregated globally.*
> > > > > > > > >
> > > > > > > > > So the first case and the third case can get the *same*
> > result,
> > > > the
> > > > > > > > > difference is the output-style and the latency.
> > > > > > > > >
> > > > > > > > > Generally speaking, the local key API is just an
> optimization
> > > > API.
> > > > > We
> > > > > > > do
> > > > > > > > > not limit the user's usage, but the user has to understand
> > its
> > > > > > > semantics
> > > > > > > > > and use it correctly.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Vino
> > > > > > > > >
> > > > > > > > > Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道:
> > > > > > > > >
> > > > > > > > > > Hi Vino,
> > > > > > > > > >
> > > > > > > > > > Thanks for the proposal, I think it is a very good
> feature!
> > > > > > > > > >
> > > > > > > > > > One thing I want to make sure is the semantics for the
> > > > > > `localKeyBy`.
> > > > > > > > From
> > > > > > > > > > the document, the `localKeyBy` API returns an instance of
> > > > > > > `KeyedStream`
> > > > > > > > > > which can also perform sum(), so in this case, what's the
> > > > > semantics
> > > > > > > for
> > > > > > > > > > `localKeyBy()`. For example, will the following code
> share
> > > the
> > > > > same
> > > > > > > > > result?
> > > > > > > > > > and what're the differences between them?
> > > > > > > > > >
> > > > > > > > > > 1. input.keyBy(0).sum(1)
> > > > > > > > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > > > > > > 3.
> > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > > > > > >
> > > > > > > > > > Would also be great if we can add this into the document.
> > > Thank
> > > > > you
> > > > > > > > very
> > > > > > > > > > much.
> > > > > > > > > >
> > > > > > > > > > Best, Hequn
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Jun 14, 2019 at 11:34 AM vino yang <
> > > > > yanghua1...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Aljoscha,
> > > > > > > > > > >
> > > > > > > > > > > I have looked at the "*Process*" section of FLIP wiki
> > > > page.[1]
> > > > > > This
> > > > > > > > > mail
> > > > > > > > > > > thread indicates that it has proceeded to the third
> step.
> > > > > > > > > > >
> > > > > > > > > > > When I looked at the fourth step(vote step), I didn't
> > find
> > > > the
> > > > > > > > > > > prerequisites for starting the voting process.
> > > > > > > > > > >
> > > > > > > > > > > Considering that the discussion of this feature has
> been
> > > done
> > > > > in
> > > > > > > the
> > > > > > > > > old
> > > > > > > > > > > thread. [2] So can you tell me when should I start
> > voting?
> > > > Can
> > > > > I
> > > > > > > > start
> > > > > > > > > > now?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Vino
> > > > > > > > > > >
> > > > > > > > > > > [1]:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > > > > > > > > [2]:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > > > > > >
> > > > > > > > > > > leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
> > > > > > > > > > >
> > > > > > > > > > > > +1 for the FLIP, thank vino for your efforts.
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Leesf
> > > > > > > > > > > >
> > > > > > > > > > > > vino yang <yanghua1...@gmail.com> 于2019年6月12日周三
> > > 下午5:46写道:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I would like to start the FLIP discussion thread
> > about
> > > > > > > supporting
> > > > > > > > > > local
> > > > > > > > > > > > > aggregation in Flink.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In short, this feature can effectively alleviate
> data
> > > > skew.
> > > > > > > This
> > > > > > > > is
> > > > > > > > > > the
> > > > > > > > > > > > > FLIP:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > *Motivation* (copied from FLIP)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Currently, keyed streams are widely used to perform
> > > > > > aggregating
> > > > > > > > > > > > operations
> > > > > > > > > > > > > (e.g., reduce, sum and window) on the elements that
> > > have
> > > > > the
> > > > > > > same
> > > > > > > > > > key.
> > > > > > > > > > > > When
> > > > > > > > > > > > > executed at runtime, the elements with the same key
> > > will
> > > > be
> > > > > > > sent
> > > > > > > > to
> > > > > > > > > > and
> > > > > > > > > > > > > aggregated by the same task.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The performance of these aggregating operations is
> > very
> > > > > > > sensitive
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > distribution of keys. In the cases where the
> > > distribution
> > > > > of
> > > > > > > keys
> > > > > > > > > > > > follows a
> > > > > > > > > > > > > powerful law, the performance will be significantly
> > > > > > downgraded.
> > > > > > > > > More
> > > > > > > > > > > > > unluckily, increasing the degree of parallelism
> does
> > > not
> > > > > help
> > > > > > > > when
> > > > > > > > > a
> > > > > > > > > > > task
> > > > > > > > > > > > > is overloaded by a single key.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Local aggregation is a widely-adopted method to
> > reduce
> > > > the
> > > > > > > > > > performance
> > > > > > > > > > > > > degraded by data skew. We can decompose the
> > aggregating
> > > > > > > > operations
> > > > > > > > > > into
> > > > > > > > > > > > two
> > > > > > > > > > > > > phases. In the first phase, we aggregate the
> elements
> > > of
> > > > > the
> > > > > > > same
> > > > > > > > > key
> > > > > > > > > > > at
> > > > > > > > > > > > > the sender side to obtain partial results. Then at
> > the
> > > > > second
> > > > > > > > > phase,
> > > > > > > > > > > > these
> > > > > > > > > > > > > partial results are sent to receivers according to
> > > their
> > > > > keys
> > > > > > > and
> > > > > > > > > are
> > > > > > > > > > > > > combined to obtain the final result. Since the
> number
> > > of
> > > > > > > partial
> > > > > > > > > > > results
> > > > > > > > > > > > > received by each receiver is limited by the number
> of
> > > > > > senders,
> > > > > > > > the
> > > > > > > > > > > > > imbalance among receivers can be reduced. Besides,
> by
> > > > > > reducing
> > > > > > > > the
> > > > > > > > > > > amount
> > > > > > > > > > > > > of transferred data the performance can be further
> > > > > improved.
> > > > > > > > > > > > >
> > > > > > > > > > > > > *More details*:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Design documentation:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > > > > > > > > > > >
> > > > > > > > > > > > > Old discussion thread:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > > > > > > > >
> > > > > > > > > > > > > JIRA: FLINK-12786 <
> > > > > > > > > https://issues.apache.org/jira/browse/FLINK-12786
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > We are looking forwards to your feedback!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > Vino
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to