[
https://issues.apache.org/jira/browse/FLUME-1259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13289781#comment-13289781
]
Mubarak Seyed commented on FLUME-1259:
--------------------------------------
Hi Will,
Please find the config
{code}
agent.sources = avro-source1
agent.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3 hdfs-sink4 hdfs-sink5
agent.channels = file-channel-1 file-channel-2 file-channel-3 file-channel-4
file-channel-5
#sources
agent.sources.avro-source1.channels = file-channel-1 file-channel-2
file-channel-3 file-channel-4 file-channel-5
agent.sources.avro-source1.type = avro
agent.sources.avro-source1.bind = 0.0.0.0
agent.sources.avro-source1.port = 9090
agent.sources.avro-source1.selector.type = multiplexing
agent.sources.avro-source1.selector.header = tableName
agent.sources.avro-source1.selector.mapping.test1 = file-channel-1
agent.sources.avro-source1.selector.mapping.test2 = file-channel-2
agent.sources.avro-source1.selector.mapping.test3 = file-channel-3
agent.sources.avro-source1.selector.mapping.test4 = file-channel-4
agent.sources.avro-source1.selector.mapping.test5 = file-channel-5
agent.sources.avro-source1.selector.default = file-channel-1
#sinks
agent.sinks.hdfs-sink1.channel = file-channel-1
agent.sinks.hdfs-sink2.channel = file-channel-2
agent.sinks.hdfs-sink3.channel = file-channel-3
agent.sinks.hdfs-sink4.channel = file-channel-4
agent.sinks.hdfs-sink5.channel = file-channel-5
agent.sinks.hdfs-sink1.type = hdfs
agent.sinks.hdfs-sink1.hdfs.batchSize = 1000
agent.sinks.hdfs-sink1.hdfs.txnEventMax = 1000
agent.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent.sinks.hdfs-sink1.hdfs.filePrefix = Test1
agent.sinks.hdfs-sink1.hdfs.rollSize = 1073741824
agent.sinks.hdfs-sink1.hdfs.rollCount = 0
agent.sinks.hdfs-sink1.hdfs.rollInterval = 0
agent.sinks.hdfs-sink1.hdfs.callTimeout = 8000
agent.sinks.hdfs-sink1.serializer = Test1AvroEventSerializer$Builder
agent.sinks.hdfs-sink1.serializer.compressionCodec = snappy
agent.sinks.hdfs-sink1.serializer.syncIntervalBytes = 2048000
agent.sinks.hdfs-sink1.hdfs.path = hdfs://test:8020/test1/%m%d%Y/%H/1
agent.sinks.hdfs-sink2.type = hdfs
agent.sinks.hdfs-sink2.hdfs.batchSize = 1000
agent.sinks.hdfs-sink2.hdfs.txnEventMax = 1000
agent.sinks.hdfs-sink2.hdfs.fileType = DataStream
agent.sinks.hdfs-sink2.hdfs.filePrefix = Test2
agent.sinks.hdfs-sink2.hdfs.rollSize = 1073741824
agent.sinks.hdfs-sink2.hdfs.rollCount = 0
agent.sinks.hdfs-sink2.hdfs.rollInterval = 0
agent.sinks.hdfs-sink2.hdfs.callTimeout = 8000
agent.sinks.hdfs-sink2.serializer = Test2AvroEventSerializer$Builder
agent.sinks.hdfs-sink2.serializer.compressionCodec = snappy
agent.sinks.hdfs-sink2.serializer.syncIntervalBytes = 2048000
agent.sinks.hdfs-sink2.hdfs.path = hdfs://test:8020/test2/%m%d%Y/%H/1
...
#channels
agent.channels.file-channel-1.type = memory
agent.channels.file-channel-1.capacity = 100000
agent.channels.file-channel-1.checkpointDir =
/data2/flume/file-channel/checkpoint1
agent.channels.file-channel-1.dataDirs = /data2/flume/file-channel/data1
agent.channels.file-channel-2.type = memory
agent.channels.file-channel-2.capacity = 100000
agent.channels.file-channel-2.checkpointDir =
/data3/flume/file-channel/checkpoint2
agent.channels.file-channel-2.dataDirs = /data3/flume/file-channel/data2
agent.channels.file-channel-3.type = memory
agent.channels.file-channel-3.capacity = 100000
agent.channels.file-channel-3.checkpointDir =
/data4/flume/file-channel/checkpoint3
agent.channels.file-channel-3.dataDirs = /data4/flume/file-channel/data3
agent.channels.file-channel-4.type = memory
agent.channels.file-channel-4.capacity = 100000
agent.channels.file-channel-4.checkpointDir =
/data5/flume/file-channel/checkpoint4
agent.channels.file-channel-4.dataDirs = /data5/flume/file-channel/data4
agent.channels.file-channel-5.type = memory
agent.channels.file-channel-5.capacity = 100000
agent.channels.file-channel-5.checkpointDir =
/data6/flume/file-channel/checkpoint5
agent.channels.file-channel-5.dataDirs = /data6/flume/file-channel/data5
{code}
> Flume throws OutOfMemory error when sending data from netcat to avro source
> (negative test case)
> ------------------------------------------------------------------------------------------------
>
> Key: FLUME-1259
> URL: https://issues.apache.org/jira/browse/FLUME-1259
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.2.0
> Environment: RHEL 6.2 64-bit
> Reporter: Will McQueen
> Priority: Minor
> Fix For: v1.2.0
>
>
> This is a negative test case.
> I mistakenly sent data from netcat to an avro source, and Flume through an
> OutOfMemory error. I sent just 5 one-character events using interactive
> netcat (nc localhost 41414) and the following config file:
> agent.channels = c1
> agent.sources = r1
> agent.sinks = k1
> #
> agent.channels.c1.type = MEMORY
> #
> agent.sources.r1.channels = c1
> agent.sources.r1.type = AVRO
> agent.sources.r1.bind = 0.0.0.0
> agent.sources.r1.port = 41414
> #
> agent.sinks.k1.channel = c1
> agent.sinks.k1.type = LOGGER
> Here's the exception:
> 2012-06-05 13:51:36,622 INFO source.AvroSource: Avro source
> starting:AvroSource: { bindAddress:0.0.0.0 port:41414 }
> 2012-06-05 13:51:36,852 DEBUG source.AvroSource: Avro source started
> 2012-06-05 13:51:37,395 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 => /0:0:0:0:0:0:0:1:41414] OPEN
> 2012-06-05 13:51:37,399 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 => /0:0:0:0:0:0:0:1:41414] BOUND:
> /0:0:0:0:0:0:0:1:41414
> 2012-06-05 13:51:37,399 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 => /0:0:0:0:0:0:0:1:41414] CONNECTED:
> /0:0:0:0:0:0:0:1:49091
> 2012-06-05 13:52:06,622 DEBUG properties.PropertiesFileConfigurationProvider:
> Checking file:/etc/flume-ng/conf/flume.conf for changes
> 2012-06-05 13:52:36,623 DEBUG properties.PropertiesFileConfigurationProvider:
> Checking file:/etc/flume-ng/conf/flume.conf for changes
> 2012-06-05 13:52:56,468 WARN ipc.NettyServer: Unexpected exception from
> downstream.
> java.lang.OutOfMemoryError: Java heap space
> at java.util.ArrayList.<init>(ArrayList.java:112)
> at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:154)
> at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:131)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214)
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
> at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 2012-06-05 13:52:56,470 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 :> /0:0:0:0:0:0:0:1:41414] DISCONNECTED
> 2012-06-05 13:52:56,470 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 :> /0:0:0:0:0:0:0:1:41414] UNBOUND
> 2012-06-05 13:52:56,471 INFO ipc.NettyServer: [id: 0x0b07f45d,
> /0:0:0:0:0:0:0:1:49091 :> /0:0:0:0:0:0:0:1:41414] CLOSED
> I'm wondering if there's some kind of validation of incoming avro format that
> we can do that would prevent this error.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira