Hi Piotr,

The localKeyBy API returns an instance of KeyedStream (we just added an
inner flag to identify the local mode) which is Flink has provided before.
Users can call all the APIs(especially *window* APIs) which KeyedStream
provided.

So if users want to use local aggregation, they should call the window API
to build a local window that means users should (or say "can") specify the
window length and other information based on their needs.

I think you described another idea different from us. We did not try to
react after triggering some predefined threshold. We tend to give users the
discretion to make decisions.

Our design idea tends to reuse Flink provided concept and functions like
state and window (IMO, we do not need to worry about OOM and the issues you
mentioned).

Best,
Vino

Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:

> Hi,
>
> +1 for the idea from my side. I’ve even attempted to add similar feature
> quite some time ago, but didn’t get enough traction [1].
>
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the stream?
> I think that’s one of the most crucial decision, since wrong decision here
> can lead to decrease of performance or to an explosion of memory/state
> consumption (both with bounded and unbounded data streams). For streaming
> it can also lead to an increased latency.
>
> Since this is also a decision that’s impossible to make automatically
> perfectly reliably, first and foremost I would expect this to be
> configurable via the API. With maybe some predefined triggers, like on
> watermark (for windowed operations), on checkpoint barrier (to decrease
> state size?), on element count, maybe memory usage (much easier to estimate
> with a known/predefined types, like in SQL)… and with some option to
> implement custom trigger.
>
> Also what would work the best would be to have a some form of memory
> consumption priority. For example if we are running out of memory for
> HashJoin/Final aggregation, instead of spilling to disk or crashing the job
> with OOM it would be probably better to prune/dump the pre/local
> aggregation state. But that’s another story.
>
> [1] https://github.com/apache/flink/pull/4626 <
> https://github.com/apache/flink/pull/4626>
>
> Piotrek
>
> > On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> >
> > Excited and  Big +1 for this feature.
> >
> > SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> >
> >> Nice feature.
> >> Looking forward to having it in Flink.
> >>
> >> Regards,
> >> Xiaogang
> >>
> >> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> >>
> >>> Hi all,
> >>>
> >>> As we mentioned in some conference, such as Flink Forward SF 2019 and
> >> QCon
> >>> Beijing 2019, our team has implemented "Local aggregation" in our inner
> >>> Flink fork. This feature can effectively alleviate data skew.
> >>>
> >>> Currently, keyed streams are widely used to perform aggregating
> >> operations
> >>> (e.g., reduce, sum and window) on the elements that having the same
> key.
> >>> When executed at runtime, the elements with the same key will be sent
> to
> >>> and aggregated by the same task.
> >>>
> >>> The performance of these aggregating operations is very sensitive to
> the
> >>> distribution of keys. In the cases where the distribution of keys
> >> follows a
> >>> powerful law, the performance will be significantly downgraded. More
> >>> unluckily, increasing the degree of parallelism does not help when a
> task
> >>> is overloaded by a single key.
> >>>
> >>> Local aggregation is a widely-adopted method to reduce the performance
> >>> degraded by data skew. We can decompose the aggregating operations into
> >> two
> >>> phases. In the first phase, we aggregate the elements of the same key
> at
> >>> the sender side to obtain partial results. Then at the second phase,
> >> these
> >>> partial results are sent to receivers according to their keys and are
> >>> combined to obtain the final result. Since the number of partial
> results
> >>> received by each receiver is limited by the number of senders, the
> >>> imbalance among receivers can be reduced. Besides, by reducing the
> amount
> >>> of transferred data the performance can be further improved.
> >>>
> >>> The design documentation is here:
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>
> >>> Any comment and feedback are welcome and appreciated.
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>
>
>

Reply via email to