Hi Vino,

Thanks for the proposal, I like the general idea and IMO it's very useful
feature.
But after reading through the document, I feel that we may over design the
required
operator for proper local aggregation. The main reason is we want to have a
clear definition and behavior about the "local keyed state" which in my
opinion is not
necessary for local aggregation, at least for start.

Another issue I noticed is the local key by operator cannot change element
type, it will
also restrict a lot of use cases which can be benefit from local
aggregation, like "average".

We also did similar logic in SQL and the only thing need to be done is
introduce
a stateless lightweight operator which is *chained* before `keyby()`. The
operator will flush all buffered
elements during `StreamOperator::prepareSnapshotPreBarrier()` and make
himself stateless.
By the way, in the earlier version we also did the similar approach by
introducing a stateful
local aggregation operator but it's not performed as well as the later one,
and also effect the barrie
alignment time. The later one is fairly simple and more efficient.

I would highly suggest you to consider to have a stateless approach at the
first step.

Best,
Kurt


On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Vino,
>
> Thanks for the proposal.
>
> Regarding to the "input.keyBy(0).sum(1)" vs
> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
> some benchmark?
> Because I'm curious about how much performance improvement can we get by
> using count window as the local operator.
>
> Best,
> Jark
>
>
>
> On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com> wrote:
>
> > Hi Hequn,
> >
> > Thanks for your reply.
> >
> > The purpose of localKeyBy API is to provide a tool which can let users do
> > pre-aggregation in the local. The behavior of the pre-aggregation is
> > similar to keyBy API.
> >
> > So the three cases are different, I will describe them one by one:
> >
> > 1. input.keyBy(0).sum(1)
> >
> > *In this case, the result is event-driven, each event can produce one sum
> > aggregation result and it is the latest one from the source start.*
> >
> > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> >
> > *In this case, the semantic may have a problem, it would do the local sum
> > aggregation and will produce the latest partial result from the source
> > start for every event. *
> > *These latest partial results from the same key are hashed to one node to
> > do the global sum aggregation.*
> > *In the global aggregation, when it received multiple partial results
> (they
> > are all calculated from the source start) and sum them will get the wrong
> > result.*
> >
> > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >
> > *In this case, it would just get a partial aggregation result for the 5
> > records in the count window. The partial aggregation results from the
> same
> > key will be aggregated globally.*
> >
> > So the first case and the third case can get the *same* result, the
> > difference is the output-style and the latency.
> >
> > Generally speaking, the local key API is just an optimization API. We do
> > not limit the user's usage, but the user has to understand its semantics
> > and use it correctly.
> >
> > Best,
> > Vino
> >
> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道:
> >
> > > Hi Vino,
> > >
> > > Thanks for the proposal, I think it is a very good feature!
> > >
> > > One thing I want to make sure is the semantics for the `localKeyBy`.
> From
> > > the document, the `localKeyBy` API returns an instance of `KeyedStream`
> > > which can also perform sum(), so in this case, what's the semantics for
> > > `localKeyBy()`. For example, will the following code share the same
> > result?
> > > and what're the differences between them?
> > >
> > > 1. input.keyBy(0).sum(1)
> > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > >
> > > Would also be great if we can add this into the document. Thank you
> very
> > > much.
> > >
> > > Best, Hequn
> > >
> > >
> > > On Fri, Jun 14, 2019 at 11:34 AM vino yang <yanghua1...@gmail.com>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I have looked at the "*Process*" section of FLIP wiki page.[1] This
> > mail
> > > > thread indicates that it has proceeded to the third step.
> > > >
> > > > When I looked at the fourth step(vote step), I didn't find the
> > > > prerequisites for starting the voting process.
> > > >
> > > > Considering that the discussion of this feature has been done in the
> > old
> > > > thread. [2] So can you tell me when should I start voting? Can I
> start
> > > now?
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > [2]:
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > >
> > > > leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
> > > >
> > > > > +1 for the FLIP, thank vino for your efforts.
> > > > >
> > > > > Best,
> > > > > Leesf
> > > > >
> > > > > vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 下午5:46写道:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I would like to start the FLIP discussion thread about supporting
> > > local
> > > > > > aggregation in Flink.
> > > > > >
> > > > > > In short, this feature can effectively alleviate data skew. This
> is
> > > the
> > > > > > FLIP:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > > > >
> > > > > >
> > > > > > *Motivation* (copied from FLIP)
> > > > > >
> > > > > > Currently, keyed streams are widely used to perform aggregating
> > > > > operations
> > > > > > (e.g., reduce, sum and window) on the elements that have the same
> > > key.
> > > > > When
> > > > > > executed at runtime, the elements with the same key will be sent
> to
> > > and
> > > > > > aggregated by the same task.
> > > > > >
> > > > > > The performance of these aggregating operations is very sensitive
> > to
> > > > the
> > > > > > distribution of keys. In the cases where the distribution of keys
> > > > > follows a
> > > > > > powerful law, the performance will be significantly downgraded.
> > More
> > > > > > unluckily, increasing the degree of parallelism does not help
> when
> > a
> > > > task
> > > > > > is overloaded by a single key.
> > > > > >
> > > > > > Local aggregation is a widely-adopted method to reduce the
> > > performance
> > > > > > degraded by data skew. We can decompose the aggregating
> operations
> > > into
> > > > > two
> > > > > > phases. In the first phase, we aggregate the elements of the same
> > key
> > > > at
> > > > > > the sender side to obtain partial results. Then at the second
> > phase,
> > > > > these
> > > > > > partial results are sent to receivers according to their keys and
> > are
> > > > > > combined to obtain the final result. Since the number of partial
> > > > results
> > > > > > received by each receiver is limited by the number of senders,
> the
> > > > > > imbalance among receivers can be reduced. Besides, by reducing
> the
> > > > amount
> > > > > > of transferred data the performance can be further improved.
> > > > > >
> > > > > > *More details*:
> > > > > >
> > > > > > Design documentation:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > > > >
> > > > > > Old discussion thread:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > >
> > > > > > JIRA: FLINK-12786 <
> > https://issues.apache.org/jira/browse/FLINK-12786
> > > >
> > > > > >
> > > > > > We are looking forwards to your feedback!
> > > > > >
> > > > > > Best,
> > > > > > Vino
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to