Hi,

I don’t think it is possible to enforce scheduling of two keys to different 
nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before 
keyBy, which is followed by final aggregation after the keyBy) can be the 
solution for handling a data skew. With pre aggregation, some (most?) of the 
work can be distributed and be done on the source node instead of doing all of 
the heavy lifting on the destination node. It has not been yet merged to the 
Flink code, but it’s entirely a user space code, which you could copy paste 
(and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 
<https://github.com/apache/flink/pull/4626>
Please pay attention at the limitations of this code (documented in the java 
doc).

If above code doesn’t work for you for whatever reason, you can also try to 
implement some custom tailored pre aggregation. Like having two keyBy steps, 
where in first you can artificially split A and B keys into couple of smaller 
ones and the second keyBy could merge/squash the results.

Piotrek

> On 9 Jan 2018, at 21:55, Martin, Nick <nick.mar...@orbitalatk.com> wrote:
> 
> Have a set of stateful operators that rely on keyed state. There is 
> substantial skew between keys (i.e. there will be 100 messages on keys A and 
> B, and 10 messages each on keys C-J), and key selection assignment is 
> dictated by the needs of my application such that I can’t choose keys in a 
> way that will eliminate the skew. The skew is somewhat predictable (i.e. I 
> know keys A and B will usually get roughly 10x as many messages as the rest) 
> and fairly consistent on different timescales (i.e. counting the messages on 
> each key for 30 seconds would provide a reasonably good guess as to the 
> distribution of messages that will be received over the next 10-20 minutes).
>  
> The problem I’m having is that often the high volume keys (A and B in the 
> example) end up on the same task slot and slow it down, while the low volume 
> ones are distributed across the other operators, leaving them underloaded. I 
> looked into the available physical partitioning functions, but it looks like 
> that functionality is generally incompatible with keyed streams, and I need 
> access to keyed state to do my actual processing. Is there any way I can get 
> better load balancing while using keyed state?
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *********************

Reply via email to