Hi Vino, It may seem similar to window operator but there are also a few key differences. For example, the local aggregate operator can send out the results at any time and the window operator can only send out the results at the end of window (without early fire). This means that the local aggregate operator can send out the results not only when the trigger time is reached, but also when the memory is exhausted. This difference makes optimization available as it means that the local aggregate operator rarely need to operate the state.
I admit that window operator can solve part of the problem (the data skew) and just wonder if we can do more. Using window operator at present seems OK for me as it can indeed solve part of the problems. We just need to think a little more in the design and make sure that the current solution is consistent with future optimizations. Thanks, Dian 在 2019年6月4日,下午5:22,vino yang <yanghua1...@gmail.com> 写道: Hi Dian, Thanks for your reply. I know what you mean. However, if you think deeply, you will find your implementation need to provide an operator which looks like a window operator. You need to use state and receive aggregation function and specify the trigger time. It looks like a lightweight window operator. Right? We try to reuse Flink provided functions and reduce complexity. IMO, It is more user-friendly because users are familiar with the window API. Best, Vino Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: Hi Vino, Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful. Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons: 1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator. 2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough. Thanks, Dian 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: Hi Ken, Thanks for your reply. As I said before, we try to reuse Flink's state concept (fault tolerance and guarantee "Exactly-Once" semantics). So we did not consider cache. In addition, if we use Flink's state, the OOM related issue is not a key problem we need to consider. Best, Vino Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: Hi all, Cascading implemented this “map-side reduce” functionality with an LLR cache. That worked well, as then the skewed keys would always be in the cache. The API let you decide the size of the cache, in terms of number of entries. Having a memory limit would have been better for many of our use cases, though FWIR there’s no good way to estimate in-memory size for objects. — Ken On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote: Hi Piotr, The localKeyBy API returns an instance of KeyedStream (we just added an inner flag to identify the local mode) which is Flink has provided before. Users can call all the APIs(especially *window* APIs) which KeyedStream provided. So if users want to use local aggregation, they should call the window API to build a local window that means users should (or say "can") specify the window length and other information based on their needs. I think you described another idea different from us. We did not try to react after triggering some predefined threshold. We tend to give users the discretion to make decisions. Our design idea tends to reuse Flink provided concept and functions like state and window (IMO, we do not need to worry about OOM and the issues you mentioned). Best, Vino Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: Hi, +1 for the idea from my side. I’ve even attempted to add similar feature quite some time ago, but didn’t get enough traction [1]. I’ve read through your document and I couldn’t find it mentioning anywhere, when the pre aggregated result should be emitted down the stream? I think that’s one of the most crucial decision, since wrong decision here can lead to decrease of performance or to an explosion of memory/state consumption (both with bounded and unbounded data streams). For streaming it can also lead to an increased latency. Since this is also a decision that’s impossible to make automatically perfectly reliably, first and foremost I would expect this to be configurable via the API. With maybe some predefined triggers, like on watermark (for windowed operations), on checkpoint barrier (to decrease state size?), on element count, maybe memory usage (much easier to estimate with a known/predefined types, like in SQL)… and with some option to implement custom trigger. Also what would work the best would be to have a some form of memory consumption priority. For example if we are running out of memory for HashJoin/Final aggregation, instead of spilling to disk or crashing the job with OOM it would be probably better to prune/dump the pre/local aggregation state. But that’s another story. [1] https://github.com/apache/flink/pull/4626 < https://github.com/apache/flink/pull/4626> Piotrek On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: Excited and Big +1 for this feature. SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: Nice feature. Looking forward to having it in Flink. Regards, Xiaogang vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: Hi all, As we mentioned in some conference, such as Flink Forward SF 2019 and QCon Beijing 2019, our team has implemented "Local aggregation" in our inner Flink fork. This feature can effectively alleviate data skew. Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that having the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task. The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key. Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved. The design documentation is here: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing Any comment and feedback are welcome and appreciated. Best, Vino -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra