Hi Litree,

>From an implementation level, the localKeyBy API returns a general
KeyedStream, you can call all the APIs which KeyedStream provides, we did
not restrict its usage, although we can do this (for example returns a new
stream object named LocalKeyedStream).

However, to achieve the goal of local aggregation, it only makes sense to
call the window API.

Best,
Vino

litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:

> Hi Vino,
>
>
> I have read your design,something I want to know is the usage of these new
> APIs.It looks like when I use localByKey,i must then use a window operator
> to return a datastream,and then use keyby and another window operator to
> get the final result?
>
>
> thanks,
> Litree
>
>
> On 06/04/2019 17:22, vino yang wrote:
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
>
> > Hi Vino,
> >
> > Thanks a lot for starting this discussion. +1 to this feature as I think
> > it will be very useful.
> >
> > Regarding to using window to buffer the input elements, personally I
> don't
> > think it's a good solution for the following reasons:
> > 1) As we know that WindowOperator will store the accumulated results in
> > states, this is not necessary for Local Aggregate operator.
> > 2) For WindowOperator, each input element will be accumulated to states.
> > This is also not necessary for Local Aggregate operator and storing the
> > input elements in memory is enough.
> >
> > Thanks,
> > Dian
> >
> > > 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
> > >
> > > Hi Ken,
> > >
> > > Thanks for your reply.
> > >
> > > As I said before, we try to reuse Flink's state concept (fault
> tolerance
> > > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> > >
> > > In addition, if we use Flink's state, the OOM related issue is not a
> key
> > > problem we need to consider.
> > >
> > > Best,
> > > Vino
> > >
> > > Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
> > >
> > >> Hi all,
> > >>
> > >> Cascading implemented this “map-side reduce” functionality with an LLR
> > >> cache.
> > >>
> > >> That worked well, as then the skewed keys would always be in the
> cache.
> > >>
> > >> The API let you decide the size of the cache, in terms of number of
> > >> entries.
> > >>
> > >> Having a memory limit would have been better for many of our use
> cases,
> > >> though FWIR there’s no good way to estimate in-memory size for
> objects.
> > >>
> > >> — Ken
> > >>
> > >>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
> > >>>
> > >>> Hi Piotr,
> > >>>
> > >>> The localKeyBy API returns an instance of KeyedStream (we just added
> an
> > >>> inner flag to identify the local mode) which is Flink has provided
> > >> before.
> > >>> Users can call all the APIs(especially *window* APIs) which
> KeyedStream
> > >>> provided.
> > >>>
> > >>> So if users want to use local aggregation, they should call the
> window
> > >> API
> > >>> to build a local window that means users should (or say "can")
> specify
> > >> the
> > >>> window length and other information based on their needs.
> > >>>
> > >>> I think you described another idea different from us. We did not try
> to
> > >>> react after triggering some predefined threshold. We tend to give
> users
> > >> the
> > >>> discretion to make decisions.
> > >>>
> > >>> Our design idea tends to reuse Flink provided concept and functions
> > like
> > >>> state and window (IMO, we do not need to worry about OOM and the
> issues
> > >> you
> > >>> mentioned).
> > >>>
> > >>> Best,
> > >>> Vino
> > >>>
> > >>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> +1 for the idea from my side. I’ve even attempted to add similar
> > feature
> > >>>> quite some time ago, but didn’t get enough traction [1].
> > >>>>
> > >>>> I’ve read through your document and I couldn’t find it mentioning
> > >>>> anywhere, when the pre aggregated result should be emitted down the
> > >> stream?
> > >>>> I think that’s one of the most crucial decision, since wrong
> decision
> > >> here
> > >>>> can lead to decrease of performance or to an explosion of
> memory/state
> > >>>> consumption (both with bounded and unbounded data streams). For
> > >> streaming
> > >>>> it can also lead to an increased latency.
> > >>>>
> > >>>> Since this is also a decision that’s impossible to make
> automatically
> > >>>> perfectly reliably, first and foremost I would expect this to be
> > >>>> configurable via the API. With maybe some predefined triggers, like
> on
> > >>>> watermark (for windowed operations), on checkpoint barrier (to
> > decrease
> > >>>> state size?), on element count, maybe memory usage (much easier to
> > >> estimate
> > >>>> with a known/predefined types, like in SQL)… and with some option to
> > >>>> implement custom trigger.
> > >>>>
> > >>>> Also what would work the best would be to have a some form of memory
> > >>>> consumption priority. For example if we are running out of memory
> for
> > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing
> > the
> > >> job
> > >>>> with OOM it would be probably better to prune/dump the pre/local
> > >>>> aggregation state. But that’s another story.
> > >>>>
> > >>>> [1] https://github.com/apache/flink/pull/4626 <
> > >>>> https://github.com/apache/flink/pull/4626>
> > >>>>
> > >>>> Piotrek
> > >>>>
> > >>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> > >>>>>
> > >>>>> Excited and  Big +1 for this feature.
> > >>>>>
> > >>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> > >>>>>
> > >>>>>> Nice feature.
> > >>>>>> Looking forward to having it in Flink.
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Xiaogang
> > >>>>>>
> > >>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019
> > and
> > >>>>>> QCon
> > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> > >> inner
> > >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> > >>>>>>>
> > >>>>>>> Currently, keyed streams are widely used to perform aggregating
> > >>>>>> operations
> > >>>>>>> (e.g., reduce, sum and window) on the elements that having the
> same
> > >>>> key.
> > >>>>>>> When executed at runtime, the elements with the same key will be
> > sent
> > >>>> to
> > >>>>>>> and aggregated by the same task.
> > >>>>>>>
> > >>>>>>> The performance of these aggregating operations is very sensitive
> > to
> > >>>> the
> > >>>>>>> distribution of keys. In the cases where the distribution of keys
> > >>>>>> follows a
> > >>>>>>> powerful law, the performance will be significantly downgraded.
> > More
> > >>>>>>> unluckily, increasing the degree of parallelism does not help
> when
> > a
> > >>>> task
> > >>>>>>> is overloaded by a single key.
> > >>>>>>>
> > >>>>>>> Local aggregation is a widely-adopted method to reduce the
> > >> performance
> > >>>>>>> degraded by data skew. We can decompose the aggregating
> operations
> > >> into
> > >>>>>> two
> > >>>>>>> phases. In the first phase, we aggregate the elements of the same
> > key
> > >>>> at
> > >>>>>>> the sender side to obtain partial results. Then at the second
> > phase,
> > >>>>>> these
> > >>>>>>> partial results are sent to receivers according to their keys and
> > are
> > >>>>>>> combined to obtain the final result. Since the number of partial
> > >>>> results
> > >>>>>>> received by each receiver is limited by the number of senders,
> the
> > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> the
> > >>>> amount
> > >>>>>>> of transferred data the performance can be further improved.
> > >>>>>>>
> > >>>>>>> The design documentation is here:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > >>>>>>>
> > >>>>>>> Any comment and feedback are welcome and appreciated.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Vino
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >> --------------------------
> > >> Ken Krugler
> > >> +1 530-210-6378
> > >> http://www.scaleunlimited.com
> > >> Custom big data solutions & training
> > >> Flink, Solr, Hadoop, Cascading & Cassandra
> > >>
> > >>
> >
> >
>

Reply via email to