Hey Stefano, your number of task slots per task manager is 6 right, e.g. 6 * 6 = 36 slots in total? You can check the total number of available task slots in the job manager web interface.
And from the log output: are you running all tasks with parallelism of 28? If you have a long pipeline, where multiple tasks run at the same time it is possible that you run into a corner case. Could you post your program without the user code, i.e. the data flow like SOURCE => FLATMAP => DISTINCT etc.? In the mean time you could try to increase the number of buffers to 4096 (taskmanager.network.numberOfBuffers), which would cost you 128 MB of main memory per machine (4096 * 32 KB). – Ufuk On Tue, Dec 2, 2014 at 11:21 AM, Stefano Bortoli <[email protected]> wrote: > Hi all, > > I have just hit a problem, stack trace at the bottom. > > It seem that there are not enough buffers to complete the run of a > process, even thou I am working far below the limit suggested by the > function presented here: > http://flink.incubator.apache.org/docs/0.6-incubating/faq.html > > I am running on 6 nodes, top 6 tasks per machine, so 4*6*6^2=864 << 2048. > > The job does a flatMap (grouped and distinct), and then two chained join > on the output of the map. Then the output of the join is filtered, > consolidated and print. > > I tried restarting the cluster, due to possible leak, but it did not work. > Am I falling into a corner case of the rule of thumb, or is it possible > that there is something not working properly? > > Noticeably, 2 nodes run just 2 tasks... so the equation changes a bit. Is > it possible that this is causing problems? furthermore, the tasks are > running where hbase and solr are running as well. So, the number of threads > is quite relevant. > > thanks a lot for the support! :-) > > saluti, > Stefano > > okkam-nano-2.okkam.it > Error: java.lang.Exception: Failed to deploy the task CHAIN > Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction) > -> > Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) > (15/28) - execution #0 to slot SubSlot 5 (cab978f80c0cb7071136cd755e971be9 > (5) - ALLOCATED/ALIVE): > org.apache.flink.runtime.io.network.InsufficientResourcesException: > okkam-nano-2.okkam.it has not enough buffers to safely execute CHAIN > Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction) > -> > Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) > (36 buffers missing) > at > org.apache.flink.runtime.io.network.ChannelManager.ensureBufferAvailability(ChannelManager.java:262) > at > org.apache.flink.runtime.io.network.ChannelManager.register(ChannelManager.java:130) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:598) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) > > at > org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) >
