Sihua,

On Thu, Apr 12, 2018 at 10:04 AM, 周思华 <summerle...@163.com> wrote:

> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that
> if you have know there are only 8 keys   then why would you still like to
> use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the
> KeyedStream, the tuples with the same key will be sent to the same
> parrallelism.
>


First my 8 keys, 16 parallelisms is just an example. Real life it is a bit
more complicated. But basically the idea is that I have a certain number of
task slots, and I want to get them busy so that my processing is as fast as
possible. Even if I have less keys that slots, I wants each slot to take
his share in the work.


>
> And I'm also a bit confuse about the pseudo code, it looks like you regard
> that the tuple with the same key in stream A will always arrive before the
> tuple in stream B? I think that can't be promised... you may need to store
> the tuple in stream B in case that tuple in stream B arrive before A, and
> do the "analysis logic" in both flatMap1() and flatMap2().
>


You are right. I just wanted to focus on my issue which is :

1/ having a co-processing that is considering only stuff of the same key
and that can store in the key-state the "rules" (and as you said I might
have to store other things for ordering reasons)
2/ but being able to parallelism a given key to use as much parallelism as
my cluster allow me to do so.


Regards,
> Sihua Zhou
>
> On 04/12/2018 15:44,Christophe Jolif<cjo...@gmail.com> <cjo...@gmail.com>
> wrote:
>
> Thanks Chesnay (and others).
>
> That's what I was figuring out. Now let's go onto the follow up with my
> exact use-case.
>
> I have two streams A and B. A basically receives "rules" that the
> processing of B should observe to process.
>
> There is a "key" that allows me to know that a rule x coming in A is for
> events with the same key coming in B.
>
> I was planning to do (pseudo code):
>
> A.connect(B).keyBy("thekey").flatMap(
>    flatMap1()
>       -> store in a ValueState the rule
>    flatMap2()
>       -> use the state to get the rule, transform the element according to
> the rule, collect it
> )
>
>
> I think it should work, right, because the ValueState will be "per key"
> and contain the rule for this key and so on?
>
> Now, what I really care is not having all the elements of key1 in the same
> parallelism, I just want to make sure key1 and key2 are isolated so I can
> use the key state to store the corresponding rule and key2 rules are not
> used for key1 and conversely.
>
> So ideally instead of using 8 parallelisms, in order to use the full
> power of my system, even with 8 keys I would like to use 16 parallelisms as
> I don't care about all elements of key1 being in the same parallelism. All
> I care is that the state contain the rule corresponding to this key.
>
> What would be the recommended approach here?
>
> Thanks again for your help,
> --
> Christophe
>
>
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> You will get 16 parallel executions since you specify a parallellism of
>> 16, however 8 of these will not get any data.
>>
>>
>> On 11.04.2018 23:29, Hao Sun wrote:
>>
>> From what I learnt, you have to control parallelism your self. You can
>> set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>>
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjo...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Imagine I have a default parallelism of 16 and I do something like
>>>
>>> stream.keyBy("something").flatMap()
>>>
>>> Now let's imagine I have less than 16 keys, maybe 8.
>>>
>>> How many parallel executions of the flatMap function will I get? 8
>>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>>
>>> (and I will have follow up questions depending on the answer I suspect
>>> ;))
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>
>>

Reply via email to