Hi everyone,

I'm constantly running into OutOfMemoryErrors and for the life of me I
cannot figure out what's wrong. Let me describe my setup. I'm running the
current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
unfinished implementation of TPC-H Q2 (
https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
machine. This is what I believe to be the relevant section of my
yarn_site.xml:


<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>57344</value>
  </property>
<!--
  <property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>8192</value>
  </property>
-->
  <property>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>55296</value>
  </property>

  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>


And this is how I submit the job:


$FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .....


The TMs happily report:

.....
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -  JVM Options:
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -Xms24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -Xmx24511m
11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
      -     -XX:MaxDirectMemorySize=65m
.....


I've tried various combinations of YARN and Flink options, to no avail. I
always end up with the following stacktrace:


org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
java.lang.OutOfMemoryError: Direct buffer memory
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
at
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
at
io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:
java.lang.OutOfMemoryError: Direct buffer memory
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
at
io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
at
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
... 10 more


I always figured that running into OOMEs with Flink would be quite hard to
achieve, however I'm wondering what's going wrong now. Seems to be related
to the Direct Memory? Why are you limiting it in the JVM options at all? Is
there a special place where I can safely increase the size / remove the
option altogether for unboundedness?

A note on the data sizes, I used a scaling factor 1000 for the dbgen
command of TPC-H, which effectively means the following. Each table is
split in 7 chunks (one local to each TM), each chunk of the part.tbl is
734M, each chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is
3.6G. These are not excessive amounts of data, however the query (at least
my implementation) involves joins (the one in line 249 causing the OOME)
and maybe there are some network issues?

Maybe you can point me into the right direction, thanks a bunch. Cheers.

Robert

Reply via email to