Hi all,

Cascading implemented this “map-side reduce” functionality with an LLR cache.

That worked well, as then the skewed keys would always be in the cache.

The API let you decide the size of the cache, in terms of number of entries.

Having a memory limit would have been better for many of our use cases, though 
FWIR there’s no good way to estimate in-memory size for objects.

— Ken

> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> The localKeyBy API returns an instance of KeyedStream (we just added an
> inner flag to identify the local mode) which is Flink has provided before.
> Users can call all the APIs(especially *window* APIs) which KeyedStream
> provided.
> 
> So if users want to use local aggregation, they should call the window API
> to build a local window that means users should (or say "can") specify the
> window length and other information based on their needs.
> 
> I think you described another idea different from us. We did not try to
> react after triggering some predefined threshold. We tend to give users the
> discretion to make decisions.
> 
> Our design idea tends to reuse Flink provided concept and functions like
> state and window (IMO, we do not need to worry about OOM and the issues you
> mentioned).
> 
> Best,
> Vino
> 
> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> 
>> Hi,
>> 
>> +1 for the idea from my side. I’ve even attempted to add similar feature
>> quite some time ago, but didn’t get enough traction [1].
>> 
>> I’ve read through your document and I couldn’t find it mentioning
>> anywhere, when the pre aggregated result should be emitted down the stream?
>> I think that’s one of the most crucial decision, since wrong decision here
>> can lead to decrease of performance or to an explosion of memory/state
>> consumption (both with bounded and unbounded data streams). For streaming
>> it can also lead to an increased latency.
>> 
>> Since this is also a decision that’s impossible to make automatically
>> perfectly reliably, first and foremost I would expect this to be
>> configurable via the API. With maybe some predefined triggers, like on
>> watermark (for windowed operations), on checkpoint barrier (to decrease
>> state size?), on element count, maybe memory usage (much easier to estimate
>> with a known/predefined types, like in SQL)… and with some option to
>> implement custom trigger.
>> 
>> Also what would work the best would be to have a some form of memory
>> consumption priority. For example if we are running out of memory for
>> HashJoin/Final aggregation, instead of spilling to disk or crashing the job
>> with OOM it would be probably better to prune/dump the pre/local
>> aggregation state. But that’s another story.
>> 
>> [1] https://github.com/apache/flink/pull/4626 <
>> https://github.com/apache/flink/pull/4626>
>> 
>> Piotrek
>> 
>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
>>> 
>>> Excited and  Big +1 for this feature.
>>> 
>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
>>> 
>>>> Nice feature.
>>>> Looking forward to having it in Flink.
>>>> 
>>>> Regards,
>>>> Xiaogang
>>>> 
>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
>>>> 
>>>>> Hi all,
>>>>> 
>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>>>> QCon
>>>>> Beijing 2019, our team has implemented "Local aggregation" in our inner
>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>> 
>>>>> Currently, keyed streams are widely used to perform aggregating
>>>> operations
>>>>> (e.g., reduce, sum and window) on the elements that having the same
>> key.
>>>>> When executed at runtime, the elements with the same key will be sent
>> to
>>>>> and aggregated by the same task.
>>>>> 
>>>>> The performance of these aggregating operations is very sensitive to
>> the
>>>>> distribution of keys. In the cases where the distribution of keys
>>>> follows a
>>>>> powerful law, the performance will be significantly downgraded. More
>>>>> unluckily, increasing the degree of parallelism does not help when a
>> task
>>>>> is overloaded by a single key.
>>>>> 
>>>>> Local aggregation is a widely-adopted method to reduce the performance
>>>>> degraded by data skew. We can decompose the aggregating operations into
>>>> two
>>>>> phases. In the first phase, we aggregate the elements of the same key
>> at
>>>>> the sender side to obtain partial results. Then at the second phase,
>>>> these
>>>>> partial results are sent to receivers according to their keys and are
>>>>> combined to obtain the final result. Since the number of partial
>> results
>>>>> received by each receiver is limited by the number of senders, the
>>>>> imbalance among receivers can be reduced. Besides, by reducing the
>> amount
>>>>> of transferred data the performance can be further improved.
>>>>> 
>>>>> The design documentation is here:
>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>> 
>>>>> Any comment and feedback are welcome and appreciated.
>>>>> 
>>>>> Best,
>>>>> Vino
>>>>> 
>>>> 
>> 
>> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to