Hi Vino,
I have read your design,something I want to know is the usage of these new APIs.It looks like when I use localByKey,i must then use a window operator to return a datastream,and then use keyby and another window operator to get the final result? thanks, Litree On 06/04/2019 17:22, vino yang wrote: Hi Dian, Thanks for your reply. I know what you mean. However, if you think deeply, you will find your implementation need to provide an operator which looks like a window operator. You need to use state and receive aggregation function and specify the trigger time. It looks like a lightweight window operator. Right? We try to reuse Flink provided functions and reduce complexity. IMO, It is more user-friendly because users are familiar with the window API. Best, Vino Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think > it will be very useful. > > Regarding to using window to buffer the input elements, personally I don't > think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in > states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. > This is also not necessary for Local Aggregate operator and storing the > input elements in memory is enough. > > Thanks, > Dian > > > 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: > > > > Hi Ken, > > > > Thanks for your reply. > > > > As I said before, we try to reuse Flink's state concept (fault tolerance > > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > > > In addition, if we use Flink's state, the OOM related issue is not a key > > problem we need to consider. > > > > Best, > > Vino > > > > Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: > > > >> Hi all, > >> > >> Cascading implemented this “map-side reduce” functionality with an LLR > >> cache. > >> > >> That worked well, as then the skewed keys would always be in the cache. > >> > >> The API let you decide the size of the cache, in terms of number of > >> entries. > >> > >> Having a memory limit would have been better for many of our use cases, > >> though FWIR there’s no good way to estimate in-memory size for objects. > >> > >> — Ken > >> > >>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote: > >>> > >>> Hi Piotr, > >>> > >>> The localKeyBy API returns an instance of KeyedStream (we just added an > >>> inner flag to identify the local mode) which is Flink has provided > >> before. > >>> Users can call all the APIs(especially *window* APIs) which KeyedStream > >>> provided. > >>> > >>> So if users want to use local aggregation, they should call the window > >> API > >>> to build a local window that means users should (or say "can") specify > >> the > >>> window length and other information based on their needs. > >>> > >>> I think you described another idea different from us. We did not try to > >>> react after triggering some predefined threshold. We tend to give users > >> the > >>> discretion to make decisions. > >>> > >>> Our design idea tends to reuse Flink provided concept and functions > like > >>> state and window (IMO, we do not need to worry about OOM and the issues > >> you > >>> mentioned). > >>> > >>> Best, > >>> Vino > >>> > >>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: > >>> > >>>> Hi, > >>>> > >>>> +1 for the idea from my side. I’ve even attempted to add similar > feature > >>>> quite some time ago, but didn’t get enough traction [1]. > >>>> > >>>> I’ve read through your document and I couldn’t find it mentioning > >>>> anywhere, when the pre aggregated result should be emitted down the > >> stream? > >>>> I think that’s one of the most crucial decision, since wrong decision > >> here > >>>> can lead to decrease of performance or to an explosion of memory/state > >>>> consumption (both with bounded and unbounded data streams). For > >> streaming > >>>> it can also lead to an increased latency. > >>>> > >>>> Since this is also a decision that’s impossible to make automatically > >>>> perfectly reliably, first and foremost I would expect this to be > >>>> configurable via the API. With maybe some predefined triggers, like on > >>>> watermark (for windowed operations), on checkpoint barrier (to > decrease > >>>> state size?), on element count, maybe memory usage (much easier to > >> estimate > >>>> with a known/predefined types, like in SQL)… and with some option to > >>>> implement custom trigger. > >>>> > >>>> Also what would work the best would be to have a some form of memory > >>>> consumption priority. For example if we are running out of memory for > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing > the > >> job > >>>> with OOM it would be probably better to prune/dump the pre/local > >>>> aggregation state. But that’s another story. > >>>> > >>>> [1] https://github.com/apache/flink/pull/4626 < > >>>> https://github.com/apache/flink/pull/4626> > >>>> > >>>> Piotrek > >>>> > >>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: > >>>>> > >>>>> Excited and Big +1 for this feature. > >>>>> > >>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: > >>>>> > >>>>>> Nice feature. > >>>>>> Looking forward to having it in Flink. > >>>>>> > >>>>>> Regards, > >>>>>> Xiaogang > >>>>>> > >>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 > and > >>>>>> QCon > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our > >> inner > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>>>> > >>>>>>> Currently, keyed streams are widely used to perform aggregating > >>>>>> operations > >>>>>>> (e.g., reduce, sum and window) on the elements that having the same > >>>> key. > >>>>>>> When executed at runtime, the elements with the same key will be > sent > >>>> to > >>>>>>> and aggregated by the same task. > >>>>>>> > >>>>>>> The performance of these aggregating operations is very sensitive > to > >>>> the > >>>>>>> distribution of keys. In the cases where the distribution of keys > >>>>>> follows a > >>>>>>> powerful law, the performance will be significantly downgraded. > More > >>>>>>> unluckily, increasing the degree of parallelism does not help when > a > >>>> task > >>>>>>> is overloaded by a single key. > >>>>>>> > >>>>>>> Local aggregation is a widely-adopted method to reduce the > >> performance > >>>>>>> degraded by data skew. We can decompose the aggregating operations > >> into > >>>>>> two > >>>>>>> phases. In the first phase, we aggregate the elements of the same > key > >>>> at > >>>>>>> the sender side to obtain partial results. Then at the second > phase, > >>>>>> these > >>>>>>> partial results are sent to receivers according to their keys and > are > >>>>>>> combined to obtain the final result. Since the number of partial > >>>> results > >>>>>>> received by each receiver is limited by the number of senders, the > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the > >>>> amount > >>>>>>> of transferred data the performance can be further improved. > >>>>>>> > >>>>>>> The design documentation is here: > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>>>> > >>>>>>> Any comment and feedback are welcome and appreciated. > >>>>>>> > >>>>>>> Best, > >>>>>>> Vino > >>>>>>> > >>>>>> > >>>> > >>>> > >> > >> -------------------------- > >> Ken Krugler > >> +1 530-210-6378 > >> http://www.scaleunlimited.com > >> Custom big data solutions & training > >> Flink, Solr, Hadoop, Cascading & Cassandra > >> > >> > >