Hi Vino,

It may seem similar to window operator but there are also a few key
differences. For example, the local aggregate operator can send out the
results at any time and the window operator can only send out the results
at the end of window (without early fire). This means that the local
aggregate operator can send out the results not only when the trigger time
is reached, but also when the memory is exhausted. This difference makes
optimization available as it means that the local aggregate operator rarely
need to operate the state.

I admit that window operator can solve part of the problem (the data skew)
and just wonder if we can do more. Using window operator at present seems
OK for me as it can indeed solve part of the problems. We just need to
think a little more in the design and make sure that the current solution
is consistent with future optimizations.

Thanks,

Dian

在 2019年6月4日,下午5:22,vino yang <yanghua1...@gmail.com> 写道:

Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:

Hi Vino,

Thanks a lot for starting this discussion. +1 to this feature as I think
it will be very useful.

Regarding to using window to buffer the input elements, personally I don't
think it's a good solution for the following reasons:
1) As we know that WindowOperator will store the accumulated results in
states, this is not necessary for Local Aggregate operator.
2) For WindowOperator, each input element will be accumulated to states.
This is also not necessary for Local Aggregate operator and storing the
input elements in memory is enough.

Thanks,
Dian

在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:

Hi Ken,

Thanks for your reply.

As I said before, we try to reuse Flink's state concept (fault tolerance
and guarantee "Exactly-Once" semantics). So we did not consider cache.

In addition, if we use Flink's state, the OOM related issue is not a key
problem we need to consider.

Best,
Vino

Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:

Hi all,

Cascading implemented this “map-side reduce” functionality with an LLR
cache.

That worked well, as then the skewed keys would always be in the cache.

The API let you decide the size of the cache, in terms of number of
entries.

Having a memory limit would have been better for many of our use cases,
though FWIR there’s no good way to estimate in-memory size for objects.

— Ken

On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:

Hi Piotr,

The localKeyBy API returns an instance of KeyedStream (we just added an
inner flag to identify the local mode) which is Flink has provided

before.

Users can call all the APIs(especially *window* APIs) which KeyedStream
provided.

So if users want to use local aggregation, they should call the window

API

to build a local window that means users should (or say "can") specify

the

window length and other information based on their needs.

I think you described another idea different from us. We did not try to
react after triggering some predefined threshold. We tend to give users

the

discretion to make decisions.

Our design idea tends to reuse Flink provided concept and functions

like

state and window (IMO, we do not need to worry about OOM and the issues

you

mentioned).

Best,
Vino

Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:

Hi,

+1 for the idea from my side. I’ve even attempted to add similar

feature

quite some time ago, but didn’t get enough traction [1].

I’ve read through your document and I couldn’t find it mentioning
anywhere, when the pre aggregated result should be emitted down the

stream?

I think that’s one of the most crucial decision, since wrong decision

here

can lead to decrease of performance or to an explosion of memory/state
consumption (both with bounded and unbounded data streams). For

streaming

it can also lead to an increased latency.

Since this is also a decision that’s impossible to make automatically
perfectly reliably, first and foremost I would expect this to be
configurable via the API. With maybe some predefined triggers, like on
watermark (for windowed operations), on checkpoint barrier (to

decrease

state size?), on element count, maybe memory usage (much easier to

estimate

with a known/predefined types, like in SQL)… and with some option to
implement custom trigger.

Also what would work the best would be to have a some form of memory
consumption priority. For example if we are running out of memory for
HashJoin/Final aggregation, instead of spilling to disk or crashing

the

job

with OOM it would be probably better to prune/dump the pre/local
aggregation state. But that’s another story.

[1] https://github.com/apache/flink/pull/4626 <
https://github.com/apache/flink/pull/4626>

Piotrek

On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:

Excited and  Big +1 for this feature.

SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:

Nice feature.
Looking forward to having it in Flink.

Regards,
Xiaogang

vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:

Hi all,

As we mentioned in some conference, such as Flink Forward SF 2019

and

QCon

Beijing 2019, our team has implemented "Local aggregation" in our

inner

Flink fork. This feature can effectively alleviate data skew.

Currently, keyed streams are widely used to perform aggregating

operations

(e.g., reduce, sum and window) on the elements that having the same

key.

When executed at runtime, the elements with the same key will be

sent

to

and aggregated by the same task.

The performance of these aggregating operations is very sensitive

to

the

distribution of keys. In the cases where the distribution of keys

follows a

powerful law, the performance will be significantly downgraded.

More

unluckily, increasing the degree of parallelism does not help when

a

task

is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the

performance

degraded by data skew. We can decompose the aggregating operations

into

two

phases. In the first phase, we aggregate the elements of the same

key

at

the sender side to obtain partial results. Then at the second

phase,

these

partial results are sent to receivers according to their keys and

are

combined to obtain the final result. Since the number of partial

results

received by each receiver is limited by the number of senders, the
imbalance among receivers can be reduced. Besides, by reducing the

amount

of transferred data the performance can be further improved.

The design documentation is here:





https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing


Any comment and feedback are welcome and appreciated.

Best,
Vino





--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to