Hello,

Sorry to ask you again, but no idea on this ?

-----Original Message-----
From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] 
Sent: lundi 21 août 2017 12:04
To: Nico Kruber <n...@data-artisans.com>
Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
Subject: RE: Great number of jobs and numberOfBuffers

Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
        public static void main(String[] args){
                // parse arguments etc
                for(String datehour:datehours){
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExectionEnvironment();
                        env.readText(datehour)
                                .union(env.readText(datehour-1))        
                                .union(env.readText(datehour-2))
                                .map()
                                .groupby()
                                .sortGroup()
                                .reduceGroup()
                                .......
                        
                        // other steps, unions, processing, inputs, outputs

                        JobExecutionResult result = env.execute();

                        // read accumulators and send some statsd statistics at 
the end of batch
                }
        }
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib 
per node.

3/ I remember that we had the same error (not enough buffers) right at startup. 
I guess that it was trying to allocate all buffers at startup as it is now 
doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 
outputs (TextHadoopOutputFormat, one in the middle of the processing, the other 
at the end), it is composed of multiple union, flatmap, map, groupby, 
sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app 
on a whole week of data, we will have to start (24 * 7) batches. Parallelism 
has the default value except for the output writers (32 and 4) in order to 
limit the numbers of files on HDFS.



-----Original Message-----
From: Nico Kruber [mailto:n...@data-artisans.com]
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without 
splitting it?
4) can you give a rough sketch of what your program is composed of (operators, 
parallelism,...)? 


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
> 
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is 
> different ?). We've been having this issue for a long time and we were 
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd 
> like to try to solve this issue.
 
> I'm not submitting individual jobs to a standalone cluster.
> 
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
>       Environment env = Environment.getExectionEnvironment();
>       env. . . .;
>       env.execute();
> }
> 
> 
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:
> required 96, but only 35 available. The total number of network 
> buffers is currently set to 36864. You can increase this number by 
> setting the configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
> 
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen 
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values 
> quite a bit. That way we can handle days of data, but not weeks or 
> months. This is not very scalable. And as you say, I felt that those 
> buffers should be recycled and that way we should have no limit as 
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were 
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -----Original Message-----
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <u...@apache.org>
> Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>;
> user@flink.apache.org; Nico Kruber <n...@data-artisans.com>
 Subject: Re:
> Great number of jobs and numberOfBuffers
> 
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
> 
> On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <u...@apache.org> wrote:
> 
> > Hey Gwenhael,
> >
> >
> >
> > the network buffers are recycled automatically after a job terminates.
> > If this does not happen, it would be quite a major bug.
> >
> >
> >
> > To help debug this:
> >
> >
> >
> > - Which version of Flink are you using?
> > - Does the job fail immediately after submission or later during 
> > execution?
 - Is the following correct: the batch job that eventually
> > fails
> > because of missing network buffers runs without problems if you 
> > submit it to a fresh cluster with the same memory
> >
> >
> >
> > The network buffers are recycled after the task managers report the 
> > task being finished. If you immediately submit the next batch there 
> > is a slight chance that the buffers are not recycled yet. As a 
> > possible temporary work around, could you try waiting for a short 
> > amount of time before submitting the next batch?
> >
> >
> >
> > I think we should also be able to run the job without splitting it 
> > up after increasing the network memory configuration. Did you 
> > already try this?
> >
> >
> >
> > Best,
> >
> >
> >
> > Ufuk
> >
> >
> >
> >
> > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers 
> > <gwenhael.pasqui...@ericsson.com> wrote:
> > 
> >> Hello,
> >>
> >>
> >>
> >>
> >>
> >> We’re meeting a limit with the numberOfBuffers.
> >>
> >>
> >>
> >>
> >>
> >> In a quite complex job we do a lot of operations, with a lot of 
> >> operators, on a lot of folders (datehours).
> >>
> >>
> >>
> >>
> >>
> >> In order to split the job into smaller “batches” (to limit the 
> >> necessary
> >> “numberOfBuffers”) I’ve done a loop over the batches (handle the 
> >> datehours 3 by 3), for each batch I create a new env then call the
> >> execute() method.
>>
> >>
> >>
> >>
> >>
> >> However it looks like there is no cleanup : after a while, if the 
> >> number of batches is too big, there is an error saying that the 
> >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> >> Is there a way to clean them up ?

Reply via email to