Hi Stephan,

That was my original understanding, until I realized that I was not using
a parallel socket source. I had a custom source that extended
SourceFunction which always runs with parallelism = 1. I looked through
the API and found the ParallelSourceFunction interface so I implemented
that and voila, now all 3 nodes in the cluster are actually receiving
traffic on socket connections.

Now that I’m running it successfully end to end, I’m trying to improve the
performance. Can you take a look at the attached screen shot and tell me
if the distribution of work amongst the pipelines is normal? I feel like
some pipelines are lot lazier than others, even though the cluster nodes
are exactly the same.

By the way, here’s the class I wrote. It would be useful to have this
available in Flink distro:

public class ParallelSocketSource implements
ParallelSourceFunction<String> {

        private static final long serialVersionUID = -271094428915640892L;
        private static final Logger LOG =
LoggerFactory.getLogger(ParallelSocketSource.class);

        private volatile boolean running = true;
        private String host;
        private int port;

        public ParallelSocketSource(String host, int port) {
                this.host = host;
                this.port = port;
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
                try (Socket socket = new Socket(host, port);
                        BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()))) {
                        String line  = null;
                        while(running && ((line = reader.readLine()) != null)) {
                                ctx.collect(line);
                        }
                } catch(IOException ex) {
                        LOG.error("error reading from socket", ex);
                }
        }

        @Override
        public void cancel() {
                running = false;
        }
}

Regards,
Ali
 

On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi Ali!
>
>In the case you have, the sequence of source-map-filter ... forms a
>pipeline.
>
>You mentioned that you set the parallelism to 16, so there should be 16
>pipelines. These pipelines should be completely independent.
>
>Looking at the way the scheduler is implemented, independent pipelines
>should be spread across machines. But when you execute that in parallel,
>you say all 16 pipelines end up on the same machine?
>
>Can you share with us the rough code of your program? Or a Screenshot from
>the runtime dashboard that shows the program graph?
>
>
>If your cluster is basically for that one job only, you could try and set
>the number of slots to 4 for each machine. Then you have 16 slots in total
>and each node would run one of the 16 pipelines.
>
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> There is no shuffle operation in my flow. Mine actually looks like this:
>>
>> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
>>->
>> Map, Filter)
>>
>>
>> Maybe it’s treating this whole flow as one pipeline and assigning it to
>>a
>> slot. What I really wanted was to have the custom source I built to have
>> running instances on all nodes. I’m not really sure if that’s the right
>> approach, but if we could add this as a feature that’d be great, since
>> having more than one node running the same pipeline guarantees the
>> pipeline is never offline.
>>
>> -Ali
>>
>> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>>
>> >If I'm not mistaken, then the scheduler has already a preference to
>>spread
>> >independent pipelines out across the cluster. At least he uses a queue
>>of
>> >instances from which it pops the first element if it allocates a new
>>slot.
>> >This instance is then appended to the queue again, if it has some
>> >resources
>> >(slots) left.
>> >
>> >I would assume that you have a shuffle operation involved in your job
>>such
>> >that it makes sense for the scheduler to deploy all pipelines to the
>>same
>> >machine.
>> >
>> >Cheers,
>> >Till
>> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
>> >
>> >> Slots are like "resource groups" which execute entire pipelines. They
>> >> frequently have more than one operator.
>> >>
>> >> What you can try as a workaround is decrease the number of slots per
>> >> machine to cause the operators to be spread across more machines.
>> >>
>> >> If this is a crucial issue for your use case, it should be simple to
>> >>add a
>> >> "preference to spread out" to the scheduler...
>> >>
>> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com>
>> >>wrote:
>> >>
>> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
>>sure
>> >>the
>> >> > parallel instances of the task are distributed across the cluster.
>> >>When I
>> >> > run my flink job with a parallelism of 16, all the parallel tasks
>>are
>> >> > assigned to the first task manager.
>> >> >
>> >> > - Ali
>> >> >
>> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>> >> >
>> >> > >
>> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com>
>> wrote:
>> >> > >> Do the parallel instances of each task get distributed across
>>the
>> >> > >>cluster or is it possible that they all run on the same node?
>> >> > >
>> >> > >Yes, slots are requested from all nodes of the cluster. But keep
>>in
>> >>mind
>> >> > >that multiple tasks (forming a local pipeline) can be scheduled to
>> >>the
>> >> > >same slot (1 slot can hold many tasks).
>> >> > >
>> >> > >Have you seen this?
>> >> > >
>> >> >
>> >>
>> >>
>> 
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >>b
>> >> > >_scheduling.html
>> >> > >
>> >> > >> If they can all run on the same node, what happens when that
>>node
>> >> > >>crashes? Does the job manager recreate them using the remaining
>>open
>> >> > >>slots?
>> >> > >
>> >> > >What happens: The job manager tries to restart the program with
>>the
>> >>same
>> >> > >parallelism. Thus if you have enough free slots available in your
>> >> > >cluster, this works smoothly (so yes, the remaining/available
>>slots
>> >>are
>> >> > >used)
>> >> > >
>> >> > >With a YARN cluster the task manager containers are restarted
>> >> > >automatically. In standalone mode, you have to take care of this
>> >> yourself.
>> >> > >
>> >> > >
>> >> > >Does this help?
>> >> > >
>> >> > >­ Ufuk
>> >> > >
>> >> >
>> >> >
>> >>
>>
>>

Reply via email to