[ 
https://issues.apache.org/jira/browse/FLUME-2731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Masanobu Horiyama updated FLUME-2731:
-------------------------------------
    Environment: 
Flume Agent : 1.6.0
OS : Mac OS X 10.7.5 and CentOS release 6.6 2.6.32-504.1.3.el6.x86_64
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65 and 1.7.0-45

Flume Client - NettyAvroRpcClient
flume-ng-sdk : 1.6.0
OS : Mac OS X 10.7.5
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65

The agent config:

{noformat}
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
#agent1.sinks.log-sink1.type = logger
agent1.sinks.log-sink1.type = null
agent1.sinks.log-sink1.batchSize = 10

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
{noformat}

The client config:

batch-size = 1
connect-timeout = 10s
request-timout = 10s

  was:
Flume Agent : 1.6.0
OS : Mac OS X 10.7.5 and CentOS release 6.6 2.6.32-504.1.3.el6.x86_64
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65 and 1.7.0-45

Flume Client - NettyAvroRpcClient
flume-ng-sdk : 1.6.0
OS : Mac OS X 10.7.5
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65

The agent config:

{noformat}
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
#agent1.sinks.log-sink1.type = logger
agent1.sinks.log-sink1.type = null
agent1.sinks.log-sink1.batchSize = 10

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
{noformat}


> Flume Agent throws OutOfMemoryError during load tests.
> ------------------------------------------------------
>
>                 Key: FLUME-2731
>                 URL: https://issues.apache.org/jira/browse/FLUME-2731
>             Project: Flume
>          Issue Type: Bug
>          Components: Node
>    Affects Versions: v1.6.0
>         Environment: Flume Agent : 1.6.0
> OS : Mac OS X 10.7.5 and CentOS release 6.6 2.6.32-504.1.3.el6.x86_64
> avro : 1.7.4
> avro-ipc : 1.7.4
> JDK: 1.6.0_65 and 1.7.0-45
> Flume Client - NettyAvroRpcClient
> flume-ng-sdk : 1.6.0
> OS : Mac OS X 10.7.5
> avro : 1.7.4
> avro-ipc : 1.7.4
> JDK: 1.6.0_65
> The agent config:
> {noformat}
> # Define a memory channel called ch1 on agent1
> agent1.channels.ch1.type = memory
> agent1.channels.ch1.capacity = 10000
> # Define an Avro source called avro-source1 on agent1 and tell it
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 41414
> # Define a logger sink that simply logs all events it receives
> # and connect it to the other end of the same channel.
> agent1.sinks.log-sink1.channel = ch1
> #agent1.sinks.log-sink1.type = logger
> agent1.sinks.log-sink1.type = null
> agent1.sinks.log-sink1.batchSize = 10
> # Finally, now that we've defined all of our components, tell
> # agent1 which ones we want to activate.
> agent1.channels = ch1
> agent1.sources = avro-source1
> agent1.sinks = log-sink1
> {noformat}
> The client config:
> batch-size = 1
> connect-timeout = 10s
> request-timout = 10s
>            Reporter: Masanobu Horiyama
>
> The flume agent throws an OutOfMemoryError during load tests.
> {noformat}
> 2015-06-29 15:30:24,590 (New I/O worker #4) [WARN - 
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)]
>  Unexpected exception from downstream.
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.HashMap.<init>(HashMap.java:187)
>         at java.util.HashMap.<init>(HashMap.java:199)
>         at 
> org.apache.avro.generic.GenericDatumReader.newMap(GenericDatumReader.java:330)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:239)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:124)
>         at 
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
>         at 
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
>         at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
>         at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at 
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
>         at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
>         at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>         at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
>         at 
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>         at 
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>         at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>         at java.lang.Thread.run(Thread.java:695)
> {noformat}
> The test: 
> A test worker consists of a NettyAvroRpcClient shared by a thread pool of 
> size 12. The rpc client instance will be recreated whenever isActive is 
> false. Flume events with a timestamp header and a body of 250 random bytes 
> are submitted continuously. Test workers are started in groups of 20. 5 
> groups are started in total with 5 second delays between starts.
> Usually, after the first group of 20, we see the OOM error in the agent.
> Got the avro-1.8.0-SNAPSHOT source, and added debug logging in the newMap 
> method to see the size of allocation:
> https://github.com/apache/avro/blob/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java#L405-L411
> And found that in most cases the size was 1, but when the OOM errors start 
> happening, the size is always 640371331.
> The OOM error occurs more frequently when the connect-timeout and 
> request-timeout are both shorter than 20 seconds.
> Seems to be related to
> AVRO-1111
> FLUME-1259
> FLUME-1641



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to