Hi Ali!

I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
make progress, even do not recognize the end-of-stream point.

I expect that the streams on 192.168.200.174 and 192.168.200.175 are
back-pressured to a stand-still. Since no network is involved, the reason
for the back pressure are probably the sinks.

What kind of data sink are you using (in the addSink()) function?
Can you check if that one starts to fully block on machines
192.168.200.174 and 192.168.200.175 ?

Greetings,
Stephan



On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Stephan,
>
> I got a request to share the image with someone and I assume it was you.
> You should be able to see it now. This seems to be the main issue I have
> at this time. I've tried running the job on the cluster with a parallelism
> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
> working for a bit and then some of them just stop, I’m not sure if they’re
> stuck or not. Here’s another screenshot:
> http://postimg.org/image/gr6ogxqjj/
>
> Two things you’ll notice:
> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
> anything at one point and only 192.168.200.173 is doing all the work.
> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time
> even though the job should be finished (the screenshot was taken after the
> source was closed).
>
> I’m not sure if this helps or not, but here are some properties from the
> flink-conf.yaml:
>
> jobmanager.heap.mb: 8192
> taskmanager.heap.mb: 49152
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>
> taskmanager.network.numberOfBuffers: 3072
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:
> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
> recovery.zookeeper.path.root: /opt/flink-0.10.0
>
> I appreciate all the help.
>
>
> Thanks,
> Ali
>
>
> On 2015-12-10, 10:16 AM, "Stephan Ewen" <se...@apache.org> wrote:
>
> >Hi Ali!
> >
> >Seems like the Google Doc has restricted access, I tells me I have no
> >permission to view it...
> >
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> Hi Stephan,
> >>
> >> Here’s a link to the screenshot I tried to attach earlier:
> >>
> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
> >>
> >> It looks to me like the distribution is fairly skewed across the nodes,
> >> even though they’re executing the same pipeline.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >>
> >> >Hi!
> >> >
> >> >The parallel socket source looks good.
> >> >I think you forgot to attach the screenshot, or the mailing list
> >>dropped
> >> >the attachment...
> >> >
> >> >Not sure if I can diagnose that without more details. The sources all
> >>do
> >> >the same. Assuming that the server distributes the data evenly across
> >>all
> >> >connected sockets, and that the network bandwidth ends up being
> >>divided in
> >> >a fair way, all pipelines should run be similarly "eager".
> >> >
> >> >Greetings,
> >> >Stephan
> >> >
> >> >
> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com>
> >>wrote:
> >> >
> >> >> Hi Stephan,
> >> >>
> >> >> That was my original understanding, until I realized that I was not
> >> >>using
> >> >> a parallel socket source. I had a custom source that extended
> >> >> SourceFunction which always runs with parallelism = 1. I looked
> >>through
> >> >> the API and found the ParallelSourceFunction interface so I
> >>implemented
> >> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> >> traffic on socket connections.
> >> >>
> >> >> Now that I’m running it successfully end to end, I’m trying to
> >>improve
> >> >>the
> >> >> performance. Can you take a look at the attached screen shot and
> >>tell me
> >> >> if the distribution of work amongst the pipelines is normal? I feel
> >>like
> >> >> some pipelines are lot lazier than others, even though the cluster
> >>nodes
> >> >> are exactly the same.
> >> >>
> >> >> By the way, here’s the class I wrote. It would be useful to have this
> >> >> available in Flink distro:
> >> >>
> >> >> public class ParallelSocketSource implements
> >> >> ParallelSourceFunction<String> {
> >> >>
> >> >>         private static final long serialVersionUID =
> >> >>-271094428915640892L;
> >> >>         private static final Logger LOG =
> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
> >> >>
> >> >>         private volatile boolean running = true;
> >> >>         private String host;
> >> >>         private int port;
> >> >>
> >> >>         public ParallelSocketSource(String host, int port) {
> >> >>                 this.host = host;
> >> >>                 this.port = port;
> >> >>         }
> >> >>
> >> >>         @Override
> >> >>         public void run(SourceContext<String> ctx) throws Exception {
> >> >>                 try (Socket socket = new Socket(host, port);
> >> >>                         BufferedReader reader = new
> >>BufferedReader(new
> >> >> InputStreamReader(socket.getInputStream()))) {
> >> >>                         String line  = null;
> >> >>                         while(running && ((line = reader.readLine())
> >>!=
> >> >> null)) {
> >> >>                                 ctx.collect(line);
> >> >>                         }
> >> >>                 } catch(IOException ex) {
> >> >>                         LOG.error("error reading from socket", ex);
> >> >>                 }
> >> >>         }
> >> >>
> >> >>         @Override
> >> >>         public void cancel() {
> >> >>                 running = false;
> >> >>         }
> >> >> }
> >> >>
> >> >> Regards,
> >> >> Ali
> >> >>
> >> >>
> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >> >>
> >> >> >Hi Ali!
> >> >> >
> >> >> >In the case you have, the sequence of source-map-filter ... forms a
> >> >> >pipeline.
> >> >> >
> >> >> >You mentioned that you set the parallelism to 16, so there should
> >>be 16
> >> >> >pipelines. These pipelines should be completely independent.
> >> >> >
> >> >> >Looking at the way the scheduler is implemented, independent
> >>pipelines
> >> >> >should be spread across machines. But when you execute that in
> >> >>parallel,
> >> >> >you say all 16 pipelines end up on the same machine?
> >> >> >
> >> >> >Can you share with us the rough code of your program? Or a
> >>Screenshot
> >> >>from
> >> >> >the runtime dashboard that shows the program graph?
> >> >> >
> >> >> >
> >> >> >If your cluster is basically for that one job only, you could try
> >>and
> >> >>set
> >> >> >the number of slots to 4 for each machine. Then you have 16 slots in
> >> >>total
> >> >> >and each node would run one of the 16 pipelines.
> >> >> >
> >> >> >
> >> >> >Greetings,
> >> >> >Stephan
> >> >> >
> >> >> >
> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com>
> >> >>wrote:
> >> >> >
> >> >> >> There is no shuffle operation in my flow. Mine actually looks like
> >> >>this:
> >> >> >>
> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
> >> >>Map
> >> >> >>->
> >> >> >> Map, Filter)
> >> >> >>
> >> >> >>
> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning
> >>it
> >> >>to
> >> >> >>a
> >> >> >> slot. What I really wanted was to have the custom source I built
> >>to
> >> >>have
> >> >> >> running instances on all nodes. I’m not really sure if that’s the
> >> >>right
> >> >> >> approach, but if we could add this as a feature that’d be great,
> >> >>since
> >> >> >> having more than one node running the same pipeline guarantees the
> >> >> >> pipeline is never offline.
> >> >> >>
> >> >> >> -Ali
> >> >> >>
> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org>
> >> wrote:
> >> >> >>
> >> >> >> >If I'm not mistaken, then the scheduler has already a preference
> >>to
> >> >> >>spread
> >> >> >> >independent pipelines out across the cluster. At least he uses a
> >> >>queue
> >> >> >>of
> >> >> >> >instances from which it pops the first element if it allocates a
> >>new
> >> >> >>slot.
> >> >> >> >This instance is then appended to the queue again, if it has some
> >> >> >> >resources
> >> >> >> >(slots) left.
> >> >> >> >
> >> >> >> >I would assume that you have a shuffle operation involved in your
> >> >>job
> >> >> >>such
> >> >> >> >that it makes sense for the scheduler to deploy all pipelines to
> >>the
> >> >> >>same
> >> >> >> >machine.
> >> >> >> >
> >> >> >> >Cheers,
> >> >> >> >Till
> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >> >> >> >
> >> >> >> >> Slots are like "resource groups" which execute entire
> >>pipelines.
> >> >>They
> >> >> >> >> frequently have more than one operator.
> >> >> >> >>
> >> >> >> >> What you can try as a workaround is decrease the number of
> >>slots
> >> >>per
> >> >> >> >> machine to cause the operators to be spread across more
> >>machines.
> >> >> >> >>
> >> >> >> >> If this is a crucial issue for your use case, it should be
> >>simple
> >> >>to
> >> >> >> >>add a
> >> >> >> >> "preference to spread out" to the scheduler...
> >> >> >> >>
> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
> >><ali.kash...@emc.com
> >> >
> >> >> >> >>wrote:
> >> >> >> >>
> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e.
> >>Make
> >> >> >>sure
> >> >> >> >>the
> >> >> >> >> > parallel instances of the task are distributed across the
> >> >>cluster.
> >> >> >> >>When I
> >> >> >> >> > run my flink job with a parallelism of 16, all the parallel
> >> >>tasks
> >> >> >>are
> >> >> >> >> > assigned to the first task manager.
> >> >> >> >> >
> >> >> >> >> > - Ali
> >> >> >> >> >
> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org>
> wrote:
> >> >> >> >> >
> >> >> >> >> > >
> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali
> >><ali.kash...@emc.com>
> >> >> >> wrote:
> >> >> >> >> > >> Do the parallel instances of each task get distributed
> >>across
> >> >> >>the
> >> >> >> >> > >>cluster or is it possible that they all run on the same
> >>node?
> >> >> >> >> > >
> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But
> >> >>keep
> >> >> >>in
> >> >> >> >>mind
> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be
> >> >>scheduled to
> >> >> >> >>the
> >> >> >> >> > >same slot (1 slot can hold many tasks).
> >> >> >> >> > >
> >> >> >> >> > >Have you seen this?
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >> >> >> >>b
> >> >> >> >> > >_scheduling.html
> >> >> >> >> > >
> >> >> >> >> > >> If they can all run on the same node, what happens when
> >>that
> >> >> >>node
> >> >> >> >> > >>crashes? Does the job manager recreate them using the
> >> >>remaining
> >> >> >>open
> >> >> >> >> > >>slots?
> >> >> >> >> > >
> >> >> >> >> > >What happens: The job manager tries to restart the program
> >>with
> >> >> >>the
> >> >> >> >>same
> >> >> >> >> > >parallelism. Thus if you have enough free slots available in
> >> >>your
> >> >> >> >> > >cluster, this works smoothly (so yes, the
> >>remaining/available
> >> >> >>slots
> >> >> >> >>are
> >> >> >> >> > >used)
> >> >> >> >> > >
> >> >> >> >> > >With a YARN cluster the task manager containers are
> >>restarted
> >> >> >> >> > >automatically. In standalone mode, you have to take care of
> >> >>this
> >> >> >> >> yourself.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >Does this help?
> >> >> >> >> > >
> >> >> >> >> > >­ Ufuk
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to