Hi Sourigna, you are using the formula correctly: #cores should to be translated into slots per taskmanager (TM), and #machines into number of TMs. So 36 ^ 2 * 10 * 4 = 51840 appears to be right. The constant 4 refers to the total number of concurrently active full network shuffles (partitioning or broadcasting). If your job is more complex, e.g., it has several inputs which are joined, reduced, etc, the constant needs to be adapted accordingly.
The high number of network buffers is due to Flink's pipelined data exchange. Producing sender tasks send records to receiving tasks while the data produced. Pipelining can significantly improve the performance of jobs, but for high parallelism it requires quite a bit of memory. Hadoop and Spark use a different technique to ship data. They collect data on the sender and ship it in batches to the receiver. This technique is less memory intensive but has a higher latency. Flink does also support batched data exchange. If you do not want to allocate so much memory for pipelined shuffles, you can activate batched data exchanges by calling: ExecutionEnvironment env = ... env.getConfig().setExecutionMode(ExecutionMode.BATCH); Best, Fabian 2016-03-03 22:27 GMT+01:00 Sourigna Phetsarath <gna.phetsar...@teamaol.com>: > All: > > I'm running a Flink 0.10.2 App by submitting to YARN as an application. > I'm using an AWS EMR cluster of 1 Master and 10 d2.8xlarge. When I submit > the job using: > > > bin/flink run \ > -m yarn-cluster \ > -yjm 20480 \ > -yn 10 \ > -ytm 80960 \ > -ys 36 \ > -yD taskmanager.network.numberOfBuffers=*51840* \ > ... > > I'm seeing this error: > > Caused by: java.io.IOException: Insufficient number of network buffers: > required *360*, but only *315* available. The total number of network > buffers is currently set to *51840*. You can increase this number by > setting the configuration key '*taskmanager.network.numberOfBuffers*'. > > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488) > > at java.lang.Thread.run(Thread.java:745) > > The error message does not seem to be conveying the correct information. > > Can someone explain to me, what are reasonable numbers to use for > *taskmanager.network.numberOfBuffers* and *t* > *askmanager.network.bufferSizeInBytes* > > I've read this: > https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers > and this: > http://stackoverflow.com/questions/33589710/flink-cluster-params-how-to-set > > But I am still unclear of the calculus is it supposed to be? > > #cores ^ 2 * #machines * 4 > > So, in my case 36 ^ 2 * 10 * 4 = 51840 > > Thanks in advance for you help that you can provide. > > -- > > > *Gna Phetsarath*System Architect // AOL Platforms // Data Services // > Applied Research Chapter > 770 Broadway, 5th Floor, New York, NY 10003 > o: 212.402.4871 // m: 917.373.7363 > vvmr: 8890237 aim: sphetsarath20 t: @sourigna > > * <http://www.aolplatforms.com>* >