[jira] [Created] (FLINK-8569) Provide a hook to override the default KeyGroupRangeAssignment

2018-02-06 Thread Nagarjun Guraja (JIRA)
Nagarjun Guraja created FLINK-8569: -- Summary: Provide a hook to override the default KeyGroupRangeAssignment Key: FLINK-8569 URL: https://issues.apache.org/jira/browse/FLINK-8569 Project: Flink

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
My case is the following: I have one stream source of elements, each element contains some key. I create a KeyedStream and then window it (so I get a WindowedStream) on top of which I apply some window function. Some numbers to my problem: 1 million records, 1000 keys. I assume parallelism is

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Aljoscha Krettek
I'm afraid that won't work because we also internally use murmur hash on the result of hashCode(). @Ovidiu I still want to understand why you want to use keyBy() for that case. It sounds like you want to use it because you would like to do something else but that is not possible with the Flink

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Greg Hogan
Integer's hashCode is the identity function. Store your slot index in an Integer or IntValue and key off that field. On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU < ovidiu-cristian.ma...@inria.fr> wrote: > Hi, > > As in my example, each key is a window so I want to evenly distributed >

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Filled in https://issues.apache.org/jira/browse/FLINK-5873 Best, Ovidiu > On 21 Feb 2017, at 12:00, Ovidiu-Cristian MARCU > wrote: > > Hi Till, > > I will look into filling a jira issue. > > Regarding the

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi, As in my example, each key is a window so I want to evenly distributed processing to all slots. If I have 100 keys and 100 slots, for each key I have the same rate of events, I don’t want skewed distribution. Best, Ovidiu > On 21 Feb 2017, at 11:38, Aljoscha Krettek

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi Till, I will look into filling a jira issue. Regarding the key group assignment, you;re right, there was a mistake in my code, here it is code and distribution: numServers is maxParallelism int numKeys = 1024; HashMap groups = new HashMap

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Ovidiu-Cristian MARCU
Hi, Any thoughts on this issue: related to what Till proposed 'to figure a key out whose hashes are uniformly distributed over the key groups’ and a way of exposing the key group assignment through the api? I wonder how other users are facing this issue. Having a small set of keys (related to

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Aljoscha Krettek
Hi Ovidiu, what's the reason for wanting to make the parallelism equal to the number of keys? I think in general it's very hard to ensure that hashes even go to different key groups. It can always happen that all your keys (if you have so few of them) are assigned to the same parallel operator

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Till Rohrmann
Hi Ovidiu, at the moment it is not possible to plugin a user defined hash function/key group assignment function. If you like, then you can file a JIRA issue to add this functionality. The key group assignment in your example looks quite skewed. One question concerning how you calculated it:

Re: KeyGroupRangeAssignment ?

2017-02-20 Thread Ovidiu-Cristian MARCU
Hi, Thank you for clarifications (I am working with KeyedStream so a custom partitioner does not help). So I should set maxParallelism>=parallelism and change my keys (from input.keyBy(0)) such that key group assignment works as expected), but I can’t modify these keys in order to make it

Re: KeyGroupRangeAssignment ?

2017-02-20 Thread Till Rohrmann
Hi Ovidiu, the way Flink works is to assign key group ranges to operators. For each element you calculate a hash value and based on that you assign it to a key group. Thus, in your example, you have either a key group with more than 1 key or multiple key groups with 1 or more keys assigned to an