To your other question, there are two things in Flink:

(1) Chaining. Tasks are folded together into one task, run by one thread.

(2) Resource groups: Tasks stay separate, have separate threads, but share
a slot (which means share memory resources). See the link in my previous
mail for an explanation concerning those.

Greetings,
Stephan


On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Gwen!
>
> You actually need not 24 slots, but only as many as the highest
> parallelism is (16). Slots do not hold individual tasks, but "pipelines".
>
> Here is an illustration how that works.
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>
> You can control whether a task can share the slot with the previous task
> with the function "startNewResourceGroup()" in the streaming API. Sharing
> lots makes a few things easier to reason about, especially when adding
> operators to a program, you need not immediately add new machines.
>
>
> How to solve your program case
> --------------------------------------------
>
> We can actually make a pretty simple addition to Flink that will cause the
> tasks to be locally connected, which in turn will cause the scheduler to
> distribute them like you intend.
> Rather than let the 4 sources rebalance across all 16 mappers, each one
> should redistribute to 4 local mappers, and these 4 mappers should send
> data to one local sink each.
>
> We'll try and add that today and ping you once it is in.
>
> The following would be sample code to use this:
>
> env.setParallelism(4);
>
> env
>     .addSource(kafkaSource)
>     .partitionFan()
>     .map(mapper).setParallelism(16);
>     .partitionFan()
>     .addSink(kafkaSink);
>
>
>
> A bit of background why the mechanism is the way that it is right now
>
> ----------------------------------------------------------------------------------------------
>
> You can think of a slot as a slice of resources. In particular, an amount
> of memory from the memory manager, but also memory in the network stack.
>
> What we want to do quite soon is to make streaming programs more elastic.
> Consider for example the case that you have 16 slots on 4 machines, a
> machine fails, and you have no spare resources. In that case Flink should
> recognize that no spare resource can be acquired, and scale the job in.
> Since you have only 12 slots left, the parallelism of the mappers is
> reduced to 12, and the source task that was on the failed machine is moved
> to a slot on another machine.
>
> It is important that the guaranteed resources for each task do not change
> when scaling in, to keep behavior predictable. In this case, each slot will
> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots
> before. That is also the reason why the slots are per TaskManager, and not
> global, to associate them with a constant set of resources (mainly memory).
>
>
> Greetings,
> Stephan
>
>
>
> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers +
>> 4 sinks) ?
>>
>>
>>
>> *Or is there a way not to set the number of slots per TaskManager instead
>> of globally so that they are at least equally dispatched among the nodes ?*
>>
>>
>>
>> As for the sink deployment : that’s not good news ; I mean we will have a
>> non-negligible overhead : all the data generated by 3 of the 4 nodes will
>> be sent to a third node instead of being sent to the “local” sink. Network
>> I/O have a price.
>>
>>
>>
>> Do you have some sort of “topology” feature coming in the roadmap ? Maybe
>> a listener on the JobManager / env that would be trigerred, asking usk on
>> which node we would prefer each node to be deployed. That way you keep the
>> standard behavior, don’t have to make a complicated generic-optimized
>> algorithm, and let the user make it’s choices. *Should I create a JIRA ?*
>>
>>
>>
>> For the time being we could start the application 4 time : one time per
>> node, put that’s not pretty at all J
>>
>>
>>
>> B.R.
>>
>>
>>
>> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
>> *Sent:* mercredi 3 février 2016 17:58
>>
>> *To:* user@flink.apache.org
>> *Subject:* Re: Distribution of sinks among the nodes
>>
>>
>>
>> Hi Gwenhäel,
>>
>> if you set the number of slots for each TaskManager to 4, then all of
>> your mapper will be evenly spread out. The sources should also be evenly
>> spread out. However, for the sinks since they depend on all mappers, it
>> will be most likely random where they are deployed. So you might end up
>> with 4 sink tasks on one machine.
>>
>> Cheers,
>> Till
>>
>> ​
>>
>>
>>
>> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>>
>> It is one type of mapper with a parallelism of 16
>> It's the same for the sinks and sources (parallelism of 4)
>>
>> The settings are
>> Env.setParallelism(4)
>> Mapper.setPrallelism(env.getParallelism() * 4)
>>
>> We mean to have X mapper tasks per source / sink
>>
>> The mapper is doing some heavy computation and we have only 4 kafka
>> partitions. That's why we need more mappers than sources / sinks
>>
>>
>>
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljos...@apache.org]
>> Sent: mercredi 3 février 2016 16:26
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>>
>> Hi Gwenhäel,
>> when you say 16 maps, are we talking about one mapper with parallelism 16
>> or 16 unique map operators?
>>
>> Regards,
>> Aljoscha
>> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <
>> gwenhael.pasqui...@ericsson.com> wrote:
>> >
>> > Hi,
>> >
>> > We try to deploy an application with the following “architecture” :
>> >
>> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots
>> (we disabled operator chaining).
>> >
>> > So we’d like on each node :
>> > 1x source => 4x map => 1x sink
>> >
>> > That way there are no exchanges between different instances of flink
>> and performances would be optimal.
>> >
>> > But we get (according to the flink GUI and the Host column when looking
>> at the details of each task) :
>> >
>> > Node 1 : 1 source =>  2 map
>> > Node 2 : 1 source =>  1 map
>> > Node 3 : 1 source =>  1 map
>> > Node 4 : 1 source =>  12 maps => 4 sinks
>> >
>> > (I think no comments are needed J)
>> >
>> > The the Web UI says that there are 24 slots and they are all used but
>> they don’t seem evenly dispatched …
>> >
>> > How could we make Flink deploy the tasks the way we want ?
>> >
>> > B.R.
>> >
>> > Gwen’
>>
>>
>>
>
>

Reply via email to