Hi,
I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT 
builds you should be able to use:

env.setParallelism(4);

env
    .addSource(kafkaSource)
    .rescale()
    .map(mapper).setParallelism(16);
    .rescale()
    .addSink(kafkaSink);

to get your desired behavior. For this to work, the parallelism should be set 
to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. 
The source will only be connected to the 4 mappers while the 4 mappers will be 
the only ones connected to the sink.

Cheers,
Aljoscha

> On 04 Feb 2016, at 18:29, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
> 
> This will implement the data shipping pattern that you mentioned in your 
> initial mail. I also have the Pull request almost ready.
> 
>> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers 
>> <gwenhael.pasqui...@ericsson.com> wrote:
>> 
>> Okay ;
>> 
>> Then I guess that the best we can do is to disable chaining (we really want 
>> one thread per operator since they are doing long operations) and have the 
>> same parallelism for sinks as mapping : that way each map will have it’s own 
>> sink and there will be no exchanges between flink instances.
>> 
>> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of 
>> Stephan Ewen
>> Sent: jeudi 4 février 2016 15:13
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> To your other question, there are two things in Flink:
>> 
>> (1) Chaining. Tasks are folded together into one task, run by one thread.
>> 
>> (2) Resource groups: Tasks stay separate, have separate threads, but share a 
>> slot (which means share memory resources). See the link in my previous mail 
>> for an explanation concerning those.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:
>> Hi Gwen!
>> 
>> You actually need not 24 slots, but only as many as the highest parallelism 
>> is (16). Slots do not hold individual tasks, but "pipelines". 
>> 
>> Here is an illustration how that works.
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>> 
>> You can control whether a task can share the slot with the previous task 
>> with the function "startNewResourceGroup()" in the streaming API. Sharing 
>> lots makes a few things easier to reason about, especially when adding 
>> operators to a program, you need not immediately add new machines.
>> 
>> 
>> How to solve your program case
>> --------------------------------------------
>> 
>> We can actually make a pretty simple addition to Flink that will cause the 
>> tasks to be locally connected, which in turn will cause the scheduler to 
>> distribute them like you intend.
>> Rather than let the 4 sources rebalance across all 16 mappers, each one 
>> should redistribute to 4 local mappers, and these 4 mappers should send data 
>> to one local sink each.
>> 
>> We'll try and add that today and ping you once it is in.
>> 
>> The following would be sample code to use this:
>> 
>> env.setParallelism(4);
>> 
>> env
>>    .addSource(kafkaSource)
>>    .partitionFan()
>>    .map(mapper).setParallelism(16);
>>    .partitionFan()
>>    .addSink(kafkaSink);
>> 
>> 
>> 
>> A bit of background why the mechanism is the way that it is right now
>> ----------------------------------------------------------------------------------------------
>> 
>> You can think of a slot as a slice of resources. In particular, an amount of 
>> memory from the memory manager, but also memory in the network stack.
>> 
>> What we want to do quite soon is to make streaming programs more elastic. 
>> Consider for example the case that you have 16 slots on 4 machines, a 
>> machine fails, and you have no spare resources. In that case Flink should 
>> recognize that no spare resource can be acquired, and scale the job in. 
>> Since you have only 12 slots left, the parallelism of the mappers is reduced 
>> to 12, and the source task that was on the failed machine is moved to a slot 
>> on another machine.
>> 
>> It is important that the guaranteed resources for each task do not change 
>> when scaling in, to keep behavior predictable. In this case, each slot will 
>> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots 
>> before. That is also the reason why the slots are per TaskManager, and not 
>> global, to associate them with a constant set of resources (mainly memory).
>> 
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers 
>> <gwenhael.pasqui...@ericsson.com> wrote:
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
>> sinks) ?
>> 
>> Or is there a way not to set the number of slots per TaskManager instead of 
>> globally so that they are at least equally dispatched among the nodes ?
>> 
>> As for the sink deployment : that’s not good news ; I mean we will have a 
>> non-negligible overhead : all the data generated by 3 of the 4 nodes will be 
>> sent to a third node instead of being sent to the “local” sink. Network I/O 
>> have a price.
>> 
>> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a 
>> listener on the JobManager / env that would be trigerred, asking usk on 
>> which node we would prefer each node to be deployed. That way you keep the 
>> standard behavior, don’t have to make a complicated generic-optimized 
>> algorithm, and let the user make it’s choices. Should I create a JIRA ?
>> 
>> For the time being we could start the application 4 time : one time per 
>> node, put that’s not pretty at all J
>> 
>> B.R.
>> 
>> From: Till Rohrmann [mailto:trohrm...@apache.org] 
>> Sent: mercredi 3 février 2016 17:58
>> 
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> 
>> if you set the number of slots for each TaskManager to 4, then all of your 
>> mapper will be evenly spread out. The sources should also be evenly spread 
>> out. However, for the sinks since they depend on all mappers, it will be 
>> most likely random where they are deployed. So you might end up with 4 sink 
>> tasks on one machine.
>> 
>> Cheers,
>> Till
>> 
>> ​
>> 
>> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers 
>> <gwenhael.pasqui...@ericsson.com> wrote:
>> It is one type of mapper with a parallelism of 16
>> It's the same for the sinks and sources (parallelism of 4)
>> 
>> The settings are
>> Env.setParallelism(4)
>> Mapper.setPrallelism(env.getParallelism() * 4)
>> 
>> We mean to have X mapper tasks per source / sink
>> 
>> The mapper is doing some heavy computation and we have only 4 kafka 
>> partitions. That's why we need more mappers than sources / sinks
>> 
>> 
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljos...@apache.org]
>> Sent: mercredi 3 février 2016 16:26
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> when you say 16 maps, are we talking about one mapper with parallelism 16 or 
>> 16 unique map operators?
>> 
>> Regards,
>> Aljoscha
>>> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers 
>>> <gwenhael.pasqui...@ericsson.com> wrote:
>>> 
>>> Hi,
>>> 
>>> We try to deploy an application with the following “architecture” :
>>> 
>>> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
>>> disabled operator chaining).
>>> 
>>> So we’d like on each node :
>>> 1x source => 4x map => 1x sink
>>> 
>>> That way there are no exchanges between different instances of flink and 
>>> performances would be optimal.
>>> 
>>> But we get (according to the flink GUI and the Host column when looking at 
>>> the details of each task) :
>>> 
>>> Node 1 : 1 source =>  2 map
>>> Node 2 : 1 source =>  1 map
>>> Node 3 : 1 source =>  1 map
>>> Node 4 : 1 source =>  12 maps => 4 sinks
>>> 
>>> (I think no comments are needed J)
>>> 
>>> The the Web UI says that there are 24 slots and they are all used but they 
>>> don’t seem evenly dispatched …
>>> 
>>> How could we make Flink deploy the tasks the way we want ?
>>> 
>>> B.R.
>>> 
>>> Gwen’
>> 
> 

Reply via email to