Re: Task Assignment

2020-04-27 Thread Piotr Nowojski
Hi Navneeth,

But what’s the problem with using `keyBy(…)`? If you have a set of keys that 
you want to process together, in other words they are are basically equal from 
the `keyBy(…)` perspective, why can’t you use this in your `KeySelector`?

Maybe to make it clear, you can think about this in two steps. You have the 
sets of keys that you want to processed together, S_1, S_2, …, S_n. Each S_i 
can contain multiple keys. The steps would:
1. You could create an artificial field, index of the set, and add it to your 
record by using some mapping function.
2. You can keyBy records using this index
After this, operator after keyBy will be receiving only keys from one of the 
sets.

(Those two operations could be done also as a single step inside `KeySelector`)

Piotrek  

> On 27 Apr 2020, at 09:28, Marta Paes Moreira  > wrote:
> 
> Sorry — I didn't understand you were dealing with multiple keys. 
> 
> In that case, I'd recommend you read about key-group assignment [1] and check 
> the KeyGroupRangeAssignment class [2]. 
> 
> Key-groups are assigned to parallel tasks as ranges before the job is started 
> — this is also a well-defined behaviour in Flink, with implications in state 
> reassignment on rescaling. I'm afraid that if you try to hardwire this 
> behaviour into your code, the job might not be transparently rescalable 
> anymore.
> 
> [1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html 
> 
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>  
> 
>  
> 
> On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan  > wrote:
> Hi Marta,
> 
> Thanks for you response. What I'm looking for is something like data 
> localization. If I have one TM which is processing a set of keys, I want to 
> ensure all keys of the same type goes to the same TM rather than using 
> hashing to find the downstream slot. I could use a common key to do this but 
> I would have to parallelize as much as possible since the number of incoming 
> messages is too large to narrow down to a single key and processing it.
> 
> Thanks
> 
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira  > wrote:
> Hi, Navneeth.
> 
> If you key your stream using stream.keyBy(…), this will logically split your 
> input and all the records with the same key will be processed in the same 
> operator instance. This is the default behavior in Flink for keyed streams 
> and transparently handled.
> 
> You can read more about it in the documentation [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>  
> 
> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan  > wrote:
> Hi All,
> 
> Is there a way for an upstream operator to know how the downstream operator 
> tasks are assigned? Basically I want to group my messages to be processed on 
> slots in the same node based on some key.
> 
> Thanks



Re: Task Assignment

2020-04-27 Thread Marta Paes Moreira
Sorry — I didn't understand you were dealing with multiple keys.

In that case, I'd recommend you read about key-group assignment [1] and
check the KeyGroupRangeAssignment class [2].

Key-groups are assigned to parallel tasks as ranges before the job is
started — this is also a well-defined behaviour in Flink, with implications
in state reassignment on rescaling. I'm afraid that if you try to hardwire
this behaviour into your code, the job might not be transparently
rescalable anymore.

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java


On Fri, Apr 24, 2020 at 7:10 AM Navneeth Krishnan 
wrote:

> Hi Marta,
>
> Thanks for you response. What I'm looking for is something like data
> localization. If I have one TM which is processing a set of keys, I want to
> ensure all keys of the same type goes to the same TM rather than using
> hashing to find the downstream slot. I could use a common key to do this
> but I would have to parallelize as much as possible since the number of
> incoming messages is too large to narrow down to a single key and
> processing it.
>
> Thanks
>
> On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira 
> wrote:
>
>> Hi, Navneeth.
>>
>> If you *key* your stream using stream.keyBy(…), this will logically
>> split your input and all the records with the same key will be processed in
>> the same operator instance. This is the default behavior in Flink for keyed
>> streams and transparently handled.
>>
>> You can read more about it in the documentation [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>>
>> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is there a way for an upstream operator to know how the downstream
>>> operator tasks are assigned? Basically I want to group my messages to be
>>> processed on slots in the same node based on some key.
>>>
>>> Thanks
>>>
>>


Re: Task Assignment

2020-04-23 Thread Navneeth Krishnan
Hi Marta,

Thanks for you response. What I'm looking for is something like data
localization. If I have one TM which is processing a set of keys, I want to
ensure all keys of the same type goes to the same TM rather than using
hashing to find the downstream slot. I could use a common key to do this
but I would have to parallelize as much as possible since the number of
incoming messages is too large to narrow down to a single key and
processing it.

Thanks

On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira 
wrote:

> Hi, Navneeth.
>
> If you *key* your stream using stream.keyBy(…), this will logically split
> your input and all the records with the same key will be processed in the
> same operator instance. This is the default behavior in Flink for keyed
> streams and transparently handled.
>
> You can read more about it in the documentation [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>
> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Is there a way for an upstream operator to know how the downstream
>> operator tasks are assigned? Basically I want to group my messages to be
>> processed on slots in the same node based on some key.
>>
>> Thanks
>>
>


Re: Task Assignment

2020-04-23 Thread Marta Paes Moreira
Hi, Navneeth.

If you *key* your stream using stream.keyBy(…), this will logically split
your input and all the records with the same key will be processed in the
same operator instance. This is the default behavior in Flink for keyed
streams and transparently handled.

You can read more about it in the documentation [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state

On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan 
wrote:

> Hi All,
>
> Is there a way for an upstream operator to know how the downstream
> operator tasks are assigned? Basically I want to group my messages to be
> processed on slots in the same node based on some key.
>
> Thanks
>