Hi Litree, >From an implementation level, the localKeyBy API returns a general KeyedStream, you can call all the APIs which KeyedStream provides, we did not restrict its usage, although we can do this (for example returns a new stream object named LocalKeyedStream).
However, to achieve the goal of local aggregation, it only makes sense to call the window API. Best, Vino litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道: > Hi Vino, > > > I have read your design,something I want to know is the usage of these new > APIs.It looks like when I use localByKey,i must then use a window operator > to return a datastream,and then use keyby and another window operator to > get the final result? > > > thanks, > Litree > > > On 06/04/2019 17:22, vino yang wrote: > Hi Dian, > > Thanks for your reply. > > I know what you mean. However, if you think deeply, you will find your > implementation need to provide an operator which looks like a window > operator. You need to use state and receive aggregation function and > specify the trigger time. It looks like a lightweight window operator. > Right? > > We try to reuse Flink provided functions and reduce complexity. IMO, It is > more user-friendly because users are familiar with the window API. > > Best, > Vino > > > Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: > > > Hi Vino, > > > > Thanks a lot for starting this discussion. +1 to this feature as I think > > it will be very useful. > > > > Regarding to using window to buffer the input elements, personally I > don't > > think it's a good solution for the following reasons: > > 1) As we know that WindowOperator will store the accumulated results in > > states, this is not necessary for Local Aggregate operator. > > 2) For WindowOperator, each input element will be accumulated to states. > > This is also not necessary for Local Aggregate operator and storing the > > input elements in memory is enough. > > > > Thanks, > > Dian > > > > > 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: > > > > > > Hi Ken, > > > > > > Thanks for your reply. > > > > > > As I said before, we try to reuse Flink's state concept (fault > tolerance > > > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > > > > > In addition, if we use Flink's state, the OOM related issue is not a > key > > > problem we need to consider. > > > > > > Best, > > > Vino > > > > > > Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: > > > > > >> Hi all, > > >> > > >> Cascading implemented this “map-side reduce” functionality with an LLR > > >> cache. > > >> > > >> That worked well, as then the skewed keys would always be in the > cache. > > >> > > >> The API let you decide the size of the cache, in terms of number of > > >> entries. > > >> > > >> Having a memory limit would have been better for many of our use > cases, > > >> though FWIR there’s no good way to estimate in-memory size for > objects. > > >> > > >> — Ken > > >> > > >>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote: > > >>> > > >>> Hi Piotr, > > >>> > > >>> The localKeyBy API returns an instance of KeyedStream (we just added > an > > >>> inner flag to identify the local mode) which is Flink has provided > > >> before. > > >>> Users can call all the APIs(especially *window* APIs) which > KeyedStream > > >>> provided. > > >>> > > >>> So if users want to use local aggregation, they should call the > window > > >> API > > >>> to build a local window that means users should (or say "can") > specify > > >> the > > >>> window length and other information based on their needs. > > >>> > > >>> I think you described another idea different from us. We did not try > to > > >>> react after triggering some predefined threshold. We tend to give > users > > >> the > > >>> discretion to make decisions. > > >>> > > >>> Our design idea tends to reuse Flink provided concept and functions > > like > > >>> state and window (IMO, we do not need to worry about OOM and the > issues > > >> you > > >>> mentioned). > > >>> > > >>> Best, > > >>> Vino > > >>> > > >>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: > > >>> > > >>>> Hi, > > >>>> > > >>>> +1 for the idea from my side. I’ve even attempted to add similar > > feature > > >>>> quite some time ago, but didn’t get enough traction [1]. > > >>>> > > >>>> I’ve read through your document and I couldn’t find it mentioning > > >>>> anywhere, when the pre aggregated result should be emitted down the > > >> stream? > > >>>> I think that’s one of the most crucial decision, since wrong > decision > > >> here > > >>>> can lead to decrease of performance or to an explosion of > memory/state > > >>>> consumption (both with bounded and unbounded data streams). For > > >> streaming > > >>>> it can also lead to an increased latency. > > >>>> > > >>>> Since this is also a decision that’s impossible to make > automatically > > >>>> perfectly reliably, first and foremost I would expect this to be > > >>>> configurable via the API. With maybe some predefined triggers, like > on > > >>>> watermark (for windowed operations), on checkpoint barrier (to > > decrease > > >>>> state size?), on element count, maybe memory usage (much easier to > > >> estimate > > >>>> with a known/predefined types, like in SQL)… and with some option to > > >>>> implement custom trigger. > > >>>> > > >>>> Also what would work the best would be to have a some form of memory > > >>>> consumption priority. For example if we are running out of memory > for > > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing > > the > > >> job > > >>>> with OOM it would be probably better to prune/dump the pre/local > > >>>> aggregation state. But that’s another story. > > >>>> > > >>>> [1] https://github.com/apache/flink/pull/4626 < > > >>>> https://github.com/apache/flink/pull/4626> > > >>>> > > >>>> Piotrek > > >>>> > > >>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: > > >>>>> > > >>>>> Excited and Big +1 for this feature. > > >>>>> > > >>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: > > >>>>> > > >>>>>> Nice feature. > > >>>>>> Looking forward to having it in Flink. > > >>>>>> > > >>>>>> Regards, > > >>>>>> Xiaogang > > >>>>>> > > >>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: > > >>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 > > and > > >>>>>> QCon > > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our > > >> inner > > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > > >>>>>>> > > >>>>>>> Currently, keyed streams are widely used to perform aggregating > > >>>>>> operations > > >>>>>>> (e.g., reduce, sum and window) on the elements that having the > same > > >>>> key. > > >>>>>>> When executed at runtime, the elements with the same key will be > > sent > > >>>> to > > >>>>>>> and aggregated by the same task. > > >>>>>>> > > >>>>>>> The performance of these aggregating operations is very sensitive > > to > > >>>> the > > >>>>>>> distribution of keys. In the cases where the distribution of keys > > >>>>>> follows a > > >>>>>>> powerful law, the performance will be significantly downgraded. > > More > > >>>>>>> unluckily, increasing the degree of parallelism does not help > when > > a > > >>>> task > > >>>>>>> is overloaded by a single key. > > >>>>>>> > > >>>>>>> Local aggregation is a widely-adopted method to reduce the > > >> performance > > >>>>>>> degraded by data skew. We can decompose the aggregating > operations > > >> into > > >>>>>> two > > >>>>>>> phases. In the first phase, we aggregate the elements of the same > > key > > >>>> at > > >>>>>>> the sender side to obtain partial results. Then at the second > > phase, > > >>>>>> these > > >>>>>>> partial results are sent to receivers according to their keys and > > are > > >>>>>>> combined to obtain the final result. Since the number of partial > > >>>> results > > >>>>>>> received by each receiver is limited by the number of senders, > the > > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing > the > > >>>> amount > > >>>>>>> of transferred data the performance can be further improved. > > >>>>>>> > > >>>>>>> The design documentation is here: > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >> > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > >>>>>>> > > >>>>>>> Any comment and feedback are welcome and appreciated. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Vino > > >>>>>>> > > >>>>>> > > >>>> > > >>>> > > >> > > >> -------------------------- > > >> Ken Krugler > > >> +1 530-210-6378 > > >> http://www.scaleunlimited.com > > >> Custom big data solutions & training > > >> Flink, Solr, Hadoop, Cascading & Cassandra > > >> > > >> > > > > >