Hi Guys,
My topology is like this:
I have set up 2 flume nodes, from avrc to hdfs:
StormAgent.sources = avro
StormAgent.channels = MemChannel
StormAgent.sinks = HDFS
StormAgent.sources.avro.type = avro
StormAgent.sources.avro.channels = MemChannel
StormAgent.sources.avro.bind = ip
StormAgent.sources.avro.port = 41414
StormAgent.sinks.HDFS.channel = MemChannel
StormAgent.sinks.HDFS.type = hdfs
StormAgent.sinks.HDFS.hdfs.path =
maprfs:///hive/mytable/partition=%{partition}
StormAgent.sinks.HDFS.hdfs.fileType = SequenceFile
StormAgent.sinks.HDFS.hdfs.batchSize = 10000
StormAgent.sinks.HDFS.hdfs.rollSize = 15000000
StormAgent.sinks.HDFS.hdfs.rollCount = 0
StormAgent.sinks.HDFS.hdfs.rollInterval = 360
StormAgent.channels.MemChannel.type = memory
StormAgent.channels.MemChannel.capacity = 100000
StormAgent.channels.MemChannel.transactionCapacity = 100000
As you can see, the channel capacity is pretty big. I assigned 2g mem to
flume when starting flume-ng.
Then in my storm topology, I read streaming data, and then calling flume
load balance(RPC client) to write to avrc, the related config looks like
this:
flume-avro-forward.client.type=default_loadbalance
flume-avro-forward.host-selector=random
flume-avro-forward.hosts=h1 h2
#dev0
flume-avro-forward.hosts.h1=ip1:41414
#dev1
flume-avro-forward.hosts.h2=ip2:41414
Everything works fine.However, after running some time, I will receive the
outofmemory error in my storm worker:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:632)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:97)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:288)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool$Preallocation.(SocketSendBufferPool.java:151)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.(SocketSendBufferPool.java:38)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:115)
at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:47)
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:34)
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:26)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.(AbstractNioWorkerPool.java:57)
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:29)
at
org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:148)
at
org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.(NioClientSocketChannelFactory.java:113)
at
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:140)
and flume log will also show exception.
The streaming data is big: every 6 min is a partition and there is around
20M bytes data, thats why I set the rollsize to be 15000000 and
rollinterval to be 6*60s to avoid small files generated under the
partition. I could use some help/guidence on the tune up of the config. I
tried to lower transactionCapacity from 100000 to 50000, but still receive
the exception. I also tried to have a third flume node, and this time it
takes longer before I see the exception. What should I try ?
Thanks in advance.
Chen