Hi Gwenhael,
First of all, we should try getting your job to work without splitting it up 
as that should work. Also, the number of network buffers should not depend on 
your input but rather the job and parallelism.

The IOException you reported may only come from either a) too few network 
buffers in the first place or b) LocalBufferPool instances not being cleaned 
up. From what I see, b) is done at task de-registration and without any 
further errors in the task managers' logs (are there any?!), this should go 
through. Regarding a), [1] suggests that
#slots-per-TM^2 * #TMs * 4 = 1536 (in your case)
buffers should be enough and you said, you are already using 36864.

So this brings me back to the question on why it is input-dependent.
Can you share your log files (privately if you prefer)?


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/
config.html#configuring-the-network-buffers

On Monday, 21 August 2017 12:03:46 CEST Gwenhael Pasquiers wrote:
> Hi,
> 
> 1/ Yes, the loop is part of the application I run on yarn. Something like :
> public class MyFlinkApp {
>       public static void main(String[] args){
>               // parse arguments etc
>               for(String datehour:datehours){
>                       ExecutionEnvironment env =
> ExecutionEnvironment.getExectionEnvironment();
 env.readText(datehour)
>                               .union(env.readText(datehour-1))        
>                               .union(env.readText(datehour-2))
>                               .map()
>                               .groupby()
>                               .sortGroup()
>                               .reduceGroup()
>                               .......
>                       
>                       // other steps, unions, processing, inputs, outputs
> 
>                       JobExecutionResult result = env.execute();
> 
>                       // read accumulators and send some statsd statistics at 
> the end of 
batch
> }
>       }
> }
> 
> 2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib
> per node.
 
> 3/ I remember that we had the same error (not enough buffers) right at
> startup. I guess that it was trying to allocate all buffers at startup as
> it is now doing it progressively (but still fails at the same limit)
 
> 4/ The program has many steps, it has about 5 inputs (readTextFile) and 2
> outputs (TextHadoopOutputFormat, one in the middle of the processing, the
> other at the end), it is composed of multiple union, flatmap, map, groupby,
> sortGroup, reduceGroup, filter, for each "batch". And if we start the flink
> app on a whole week of data, we will have to start (24 * 7) batches.
> Parallelism has the default value except for the output writers (32 and 4)
> in order to limit the numbers of files on HDFS.
 
> 
> 
> -----Original Message-----
> From: Nico Kruber [mailto:n...@data-artisans.com] 
> Sent: vendredi 18 août 2017 14:58
> To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
> Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
> Subject: Re: Great number of jobs and numberOfBuffers
> 
> Hi Gwenhael,
> the effect you describe sounds a bit strange. Just to clarify your setup:
> 
> 1) Is the loop you were posting part of the application you run on yarn?
> 2) How many nodes are you running with?
> 3) What is the error you got when you tried to run the full program without
> splitting it?
 4) can you give a rough sketch of what your program is
> composed of (operators, parallelism,...)? 
> 
> Nico
> 
> On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> 
> > Hello,
> > 
> > This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is 
> > different ?). We've been having this issue for a long time and we were 
> > careful not to schedule too many jobs.
> 
>  
> 
> > I'm currently upgrading the application towards flink 1.2.1 and I'd 
> > like to try to solve this issue.
> 
>  
> 
> > I'm not submitting individual jobs to a standalone cluster.
> > 
> > I'm starting a single application that has a loop in its main function :
> > for(. . .) {
> > 
> >     Environment env = Environment.getExectionEnvironment();
> >     env. . . .;
> >     env.execute();
> > 
> > }
> > 
> > 
> > The job fails at some point later during execution with the following
> > error:
> 
>  java.io.IOException: Insufficient number of network buffers:
> 
> > required 96, but only 35 available. The total number of network 
> > buffers is currently set to 36864. You can increase this number by 
> > setting the configuration key 'taskmanager.network.numberOfBuffers'. 
> > at 
> > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> > ferPo
> > ol(NetworkBufferPool.java:196)
> > Before splitting the job in multiple sub-jobs it failed right at startup.
> > 
> > Each "batch" job takes 10 to 30 minutes and it fails after about dozen 
> > of them (the first ones should have had enough time to be recycled).
> 
>  
> 
> > We've already increased the jobmanager and "numberOfBuffers" values 
> > quite a bit. That way we can handle days of data, but not weeks or 
> > months. This is not very scalable. And as you say, I felt that those 
> > buffers should be recycled and that way we should have no limit as 
> > long as each batch is small enough.
> 
>  
> 
> > If I start my command again (removing the datehours that were 
> > successfully
> > processed) it will work since it's a fresh new cluster.
> 
>  
> 
> > -----Original Message-----
> > From: Ufuk Celebi [mailto:u...@apache.org]
> > Sent: jeudi 17 août 2017 11:24
> > To: Ufuk Celebi <u...@apache.org>
> > Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>;
> > user@flink.apache.org; Nico Kruber <n...@data-artisans.com>
> 
>  Subject: Re:
> 
> > Great number of jobs and numberOfBuffers
> > 
> > PS: Also pulling in Nico (CC'd) who is working on the network stack.
> > 
> > On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <u...@apache.org> wrote:
> > 
> > 
> > > Hey Gwenhael,
> > >
> > >
> > >
> > >
> > >
> > > the network buffers are recycled automatically after a job terminates.
> > > If this does not happen, it would be quite a major bug.
> > >
> > >
> > >
> > >
> > >
> > > To help debug this:
> > >
> > >
> > >
> > >
> > >
> > > - Which version of Flink are you using?
> > > - Does the job fail immediately after submission or later during 
> > > execution?
> 
>  - Is the following correct: the batch job that eventually
> 
> > > fails
> > > because of missing network buffers runs without problems if you 
> > > submit it to a fresh cluster with the same memory
> > >
> > >
> > >
> > >
> > >
> > > The network buffers are recycled after the task managers report the 
> > > task being finished. If you immediately submit the next batch there 
> > > is a slight chance that the buffers are not recycled yet. As a 
> > > possible temporary work around, could you try waiting for a short 
> > > amount of time before submitting the next batch?
> > >
> > >
> > >
> > >
> > >
> > > I think we should also be able to run the job without splitting it 
> > > up after increasing the network memory configuration. Did you 
> > > already try this?
> > >
> > >
> > >
> > >
> > >
> > > Best,
> > >
> > >
> > >
> > >
> > >
> > > Ufuk
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers 
> > > <gwenhael.pasqui...@ericsson.com> wrote:
> > > 
> > > 
> > >> Hello,
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> We’re meeting a limit with the numberOfBuffers.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> In a quite complex job we do a lot of operations, with a lot of 
> > >> operators, on a lot of folders (datehours).
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> In order to split the job into smaller “batches” (to limit the 
> > >> necessary
> > >> “numberOfBuffers”) I’ve done a loop over the batches (handle the 
> > >> datehours 3 by 3), for each batch I create a new env then call the
> > >> execute() method.
> >>
> >>
> >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> However it looks like there is no cleanup : after a while, if the 
> > >> number of batches is too big, there is an error saying that the 
> > >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> > >> Is there a way to clean them up ?
> 
> 

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to