Sihua, On Thu, Apr 12, 2018 at 10:04 AM, 周思华 <summerle...@163.com> wrote:
> Hi Christophe, > I think what you want to do is "stream join", and I'm a bit confuse that > if you have know there are only 8 keys then why would you still like to > use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the > KeyedStream, the tuples with the same key will be sent to the same > parrallelism. > First my 8 keys, 16 parallelisms is just an example. Real life it is a bit more complicated. But basically the idea is that I have a certain number of task slots, and I want to get them busy so that my processing is as fast as possible. Even if I have less keys that slots, I wants each slot to take his share in the work. > > And I'm also a bit confuse about the pseudo code, it looks like you regard > that the tuple with the same key in stream A will always arrive before the > tuple in stream B? I think that can't be promised... you may need to store > the tuple in stream B in case that tuple in stream B arrive before A, and > do the "analysis logic" in both flatMap1() and flatMap2(). > You are right. I just wanted to focus on my issue which is : 1/ having a co-processing that is considering only stuff of the same key and that can store in the key-state the "rules" (and as you said I might have to store other things for ordering reasons) 2/ but being able to parallelism a given key to use as much parallelism as my cluster allow me to do so. Regards, > Sihua Zhou > > On 04/12/2018 15:44,Christophe Jolif<cjo...@gmail.com> <cjo...@gmail.com> > wrote: > > Thanks Chesnay (and others). > > That's what I was figuring out. Now let's go onto the follow up with my > exact use-case. > > I have two streams A and B. A basically receives "rules" that the > processing of B should observe to process. > > There is a "key" that allows me to know that a rule x coming in A is for > events with the same key coming in B. > > I was planning to do (pseudo code): > > A.connect(B).keyBy("thekey").flatMap( > flatMap1() > -> store in a ValueState the rule > flatMap2() > -> use the state to get the rule, transform the element according to > the rule, collect it > ) > > > I think it should work, right, because the ValueState will be "per key" > and contain the rule for this key and so on? > > Now, what I really care is not having all the elements of key1 in the same > parallelism, I just want to make sure key1 and key2 are isolated so I can > use the key state to store the corresponding rule and key2 rules are not > used for key1 and conversely. > > So ideally instead of using 8 parallelisms, in order to use the full > power of my system, even with 8 keys I would like to use 16 parallelisms as > I don't care about all elements of key1 being in the same parallelism. All > I care is that the state contain the rule corresponding to this key. > > What would be the recommended approach here? > > Thanks again for your help, > -- > Christophe > > > On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ches...@apache.org> > wrote: > >> You will get 16 parallel executions since you specify a parallellism of >> 16, however 8 of these will not get any data. >> >> >> On 11.04.2018 23:29, Hao Sun wrote: >> >> From what I learnt, you have to control parallelism your self. You can >> set parallelism on operator or set default one through flink-config.yaml. >> I might be wrong. >> >> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjo...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> Imagine I have a default parallelism of 16 and I do something like >>> >>> stream.keyBy("something").flatMap() >>> >>> Now let's imagine I have less than 16 keys, maybe 8. >>> >>> How many parallel executions of the flatMap function will I get? 8 >>> because I have 8 keys, or 16 because I have default parallelism at 16? >>> >>> (and I will have follow up questions depending on the answer I suspect >>> ;)) >>> >>> Thanks, >>> -- >>> Christophe >>> >> >>