Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:

> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> > 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
> >
> > Hi Ken,
> >
> > Thanks for your reply.
> >
> > As I said before, we try to reuse Flink's state concept (fault tolerance
> > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> >
> > In addition, if we use Flink's state, the OOM related issue is not a key
> > problem we need to consider.
> >
> > Best,
> > Vino
> >
> > Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
> >
> >> Hi all,
> >>
> >> Cascading implemented this “map-side reduce” functionality with an LLR
> >> cache.
> >>
> >> That worked well, as then the skewed keys would always be in the cache.
> >>
> >> The API let you decide the size of the cache, in terms of number of
> >> entries.
> >>
> >> Having a memory limit would have been better for many of our use cases,
> >> though FWIR there’s no good way to estimate in-memory size for objects.
> >>
> >> — Ken
> >>
> >>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> The localKeyBy API returns an instance of KeyedStream (we just added an
> >>> inner flag to identify the local mode) which is Flink has provided
> >> before.
> >>> Users can call all the APIs(especially *window* APIs) which KeyedStream
> >>> provided.
> >>>
> >>> So if users want to use local aggregation, they should call the window
> >> API
> >>> to build a local window that means users should (or say "can") specify
> >> the
> >>> window length and other information based on their needs.
> >>>
> >>> I think you described another idea different from us. We did not try to
> >>> react after triggering some predefined threshold. We tend to give users
> >> the
> >>> discretion to make decisions.
> >>>
> >>> Our design idea tends to reuse Flink provided concept and functions
> like
> >>> state and window (IMO, we do not need to worry about OOM and the issues
> >> you
> >>> mentioned).
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> >>>
> >>>> Hi,
> >>>>
> >>>> +1 for the idea from my side. I’ve even attempted to add similar
> feature
> >>>> quite some time ago, but didn’t get enough traction [1].
> >>>>
> >>>> I’ve read through your document and I couldn’t find it mentioning
> >>>> anywhere, when the pre aggregated result should be emitted down the
> >> stream?
> >>>> I think that’s one of the most crucial decision, since wrong decision
> >> here
> >>>> can lead to decrease of performance or to an explosion of memory/state
> >>>> consumption (both with bounded and unbounded data streams). For
> >> streaming
> >>>> it can also lead to an increased latency.
> >>>>
> >>>> Since this is also a decision that’s impossible to make automatically
> >>>> perfectly reliably, first and foremost I would expect this to be
> >>>> configurable via the API. With maybe some predefined triggers, like on
> >>>> watermark (for windowed operations), on checkpoint barrier (to
> decrease
> >>>> state size?), on element count, maybe memory usage (much easier to
> >> estimate
> >>>> with a known/predefined types, like in SQL)… and with some option to
> >>>> implement custom trigger.
> >>>>
> >>>> Also what would work the best would be to have a some form of memory
> >>>> consumption priority. For example if we are running out of memory for
> >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing
> the
> >> job
> >>>> with OOM it would be probably better to prune/dump the pre/local
> >>>> aggregation state. But that’s another story.
> >>>>
> >>>> [1] https://github.com/apache/flink/pull/4626 <
> >>>> https://github.com/apache/flink/pull/4626>
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> >>>>>
> >>>>> Excited and  Big +1 for this feature.
> >>>>>
> >>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> >>>>>
> >>>>>> Nice feature.
> >>>>>> Looking forward to having it in Flink.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Xiaogang
> >>>>>>
> >>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019
> and
> >>>>>> QCon
> >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> >> inner
> >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>>>
> >>>>>>> Currently, keyed streams are widely used to perform aggregating
> >>>>>> operations
> >>>>>>> (e.g., reduce, sum and window) on the elements that having the same
> >>>> key.
> >>>>>>> When executed at runtime, the elements with the same key will be
> sent
> >>>> to
> >>>>>>> and aggregated by the same task.
> >>>>>>>
> >>>>>>> The performance of these aggregating operations is very sensitive
> to
> >>>> the
> >>>>>>> distribution of keys. In the cases where the distribution of keys
> >>>>>> follows a
> >>>>>>> powerful law, the performance will be significantly downgraded.
> More
> >>>>>>> unluckily, increasing the degree of parallelism does not help when
> a
> >>>> task
> >>>>>>> is overloaded by a single key.
> >>>>>>>
> >>>>>>> Local aggregation is a widely-adopted method to reduce the
> >> performance
> >>>>>>> degraded by data skew. We can decompose the aggregating operations
> >> into
> >>>>>> two
> >>>>>>> phases. In the first phase, we aggregate the elements of the same
> key
> >>>> at
> >>>>>>> the sender side to obtain partial results. Then at the second
> phase,
> >>>>>> these
> >>>>>>> partial results are sent to receivers according to their keys and
> are
> >>>>>>> combined to obtain the final result. Since the number of partial
> >>>> results
> >>>>>>> received by each receiver is limited by the number of senders, the
> >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
> >>>> amount
> >>>>>>> of transferred data the performance can be further improved.
> >>>>>>>
> >>>>>>> The design documentation is here:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>>>
> >>>>>>> Any comment and feedback are welcome and appreciated.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Vino
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >> --------------------------
> >> Ken Krugler
> >> +1 530-210-6378
> >> http://www.scaleunlimited.com
> >> Custom big data solutions & training
> >> Flink, Solr, Hadoop, Cascading & Cassandra
> >>
> >>
>
>

Reply via email to