Hi Vino, Thanks for the proposal, I like the general idea and IMO it's very useful feature. But after reading through the document, I feel that we may over design the required operator for proper local aggregation. The main reason is we want to have a clear definition and behavior about the "local keyed state" which in my opinion is not necessary for local aggregation, at least for start.
Another issue I noticed is the local key by operator cannot change element type, it will also restrict a lot of use cases which can be benefit from local aggregation, like "average". We also did similar logic in SQL and the only thing need to be done is introduce a stateless lightweight operator which is *chained* before `keyby()`. The operator will flush all buffered elements during `StreamOperator::prepareSnapshotPreBarrier()` and make himself stateless. By the way, in the earlier version we also did the similar approach by introducing a stateful local aggregation operator but it's not performed as well as the later one, and also effect the barrie alignment time. The later one is fairly simple and more efficient. I would highly suggest you to consider to have a stateless approach at the first step. Best, Kurt On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com> wrote: > Hi Vino, > > Thanks for the proposal. > > Regarding to the "input.keyBy(0).sum(1)" vs > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done > some benchmark? > Because I'm curious about how much performance improvement can we get by > using count window as the local operator. > > Best, > Jark > > > > On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com> wrote: > > > Hi Hequn, > > > > Thanks for your reply. > > > > The purpose of localKeyBy API is to provide a tool which can let users do > > pre-aggregation in the local. The behavior of the pre-aggregation is > > similar to keyBy API. > > > > So the three cases are different, I will describe them one by one: > > > > 1. input.keyBy(0).sum(1) > > > > *In this case, the result is event-driven, each event can produce one sum > > aggregation result and it is the latest one from the source start.* > > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > > > > *In this case, the semantic may have a problem, it would do the local sum > > aggregation and will produce the latest partial result from the source > > start for every event. * > > *These latest partial results from the same key are hashed to one node to > > do the global sum aggregation.* > > *In the global aggregation, when it received multiple partial results > (they > > are all calculated from the source start) and sum them will get the wrong > > result.* > > > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > > > > *In this case, it would just get a partial aggregation result for the 5 > > records in the count window. The partial aggregation results from the > same > > key will be aggregated globally.* > > > > So the first case and the third case can get the *same* result, the > > difference is the output-style and the latency. > > > > Generally speaking, the local key API is just an optimization API. We do > > not limit the user's usage, but the user has to understand its semantics > > and use it correctly. > > > > Best, > > Vino > > > > Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道: > > > > > Hi Vino, > > > > > > Thanks for the proposal, I think it is a very good feature! > > > > > > One thing I want to make sure is the semantics for the `localKeyBy`. > From > > > the document, the `localKeyBy` API returns an instance of `KeyedStream` > > > which can also perform sum(), so in this case, what's the semantics for > > > `localKeyBy()`. For example, will the following code share the same > > result? > > > and what're the differences between them? > > > > > > 1. input.keyBy(0).sum(1) > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > > > > > > Would also be great if we can add this into the document. Thank you > very > > > much. > > > > > > Best, Hequn > > > > > > > > > On Fri, Jun 14, 2019 at 11:34 AM vino yang <yanghua1...@gmail.com> > > wrote: > > > > > > > Hi Aljoscha, > > > > > > > > I have looked at the "*Process*" section of FLIP wiki page.[1] This > > mail > > > > thread indicates that it has proceeded to the third step. > > > > > > > > When I looked at the fourth step(vote step), I didn't find the > > > > prerequisites for starting the voting process. > > > > > > > > Considering that the discussion of this feature has been done in the > > old > > > > thread. [2] So can you tell me when should I start voting? Can I > start > > > now? > > > > > > > > Best, > > > > Vino > > > > > > > > [1]: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up > > > > [2]: > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > > > > > > > > leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道: > > > > > > > > > +1 for the FLIP, thank vino for your efforts. > > > > > > > > > > Best, > > > > > Leesf > > > > > > > > > > vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 下午5:46写道: > > > > > > > > > > > Hi folks, > > > > > > > > > > > > I would like to start the FLIP discussion thread about supporting > > > local > > > > > > aggregation in Flink. > > > > > > > > > > > > In short, this feature can effectively alleviate data skew. This > is > > > the > > > > > > FLIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink > > > > > > > > > > > > > > > > > > *Motivation* (copied from FLIP) > > > > > > > > > > > > Currently, keyed streams are widely used to perform aggregating > > > > > operations > > > > > > (e.g., reduce, sum and window) on the elements that have the same > > > key. > > > > > When > > > > > > executed at runtime, the elements with the same key will be sent > to > > > and > > > > > > aggregated by the same task. > > > > > > > > > > > > The performance of these aggregating operations is very sensitive > > to > > > > the > > > > > > distribution of keys. In the cases where the distribution of keys > > > > > follows a > > > > > > powerful law, the performance will be significantly downgraded. > > More > > > > > > unluckily, increasing the degree of parallelism does not help > when > > a > > > > task > > > > > > is overloaded by a single key. > > > > > > > > > > > > Local aggregation is a widely-adopted method to reduce the > > > performance > > > > > > degraded by data skew. We can decompose the aggregating > operations > > > into > > > > > two > > > > > > phases. In the first phase, we aggregate the elements of the same > > key > > > > at > > > > > > the sender side to obtain partial results. Then at the second > > phase, > > > > > these > > > > > > partial results are sent to receivers according to their keys and > > are > > > > > > combined to obtain the final result. Since the number of partial > > > > results > > > > > > received by each receiver is limited by the number of senders, > the > > > > > > imbalance among receivers can be reduced. Besides, by reducing > the > > > > amount > > > > > > of transferred data the performance can be further improved. > > > > > > > > > > > > *More details*: > > > > > > > > > > > > Design documentation: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > > > > > > > > > > > Old discussion thread: > > > > > > > > > > > > > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > > > > > > > > > > > > JIRA: FLINK-12786 < > > https://issues.apache.org/jira/browse/FLINK-12786 > > > > > > > > > > > > > > > > We are looking forwards to your feedback! > > > > > > > > > > > > Best, > > > > > > Vino > > > > > > > > > > > > > > > > > > > > >