Do you see any errors in the hdfs logs? Probably I would first look in the datanode logs.
On Mon, Oct 20, 2014 at 6:41 AM, Pal Konyves <[email protected]> wrote: > Hi, Jeff, > > Thanks, I will try out, but is there any reason why one sink is so > slow, or is it really a performance issue with any functional > benefits? > > Thanks, > Pal > > On Mon, Oct 20, 2014 at 3:22 PM, Jeff Lord <[email protected]> wrote: > > Pal, > > > > You can add more sinks to your config. > > Don't put them in a sink group just have multiple sinks pulling from the > > same channel. This should increase your throughput. > > > > Best, > > > > Jeff > > > > On Mon, Oct 20, 2014 at 3:49 AM, Pal Konyves <[email protected]> > wrote: > >> > >> Hi there, > >> > >> We would like to write lots of logs to HDFS via Flume, you can imagine > >> it as a stress test, or max throughput test. I attached our flume > >> config below. The problem is, HDFS writes are freakin' slow. Flume > >> writes to HDFS with ~5-10Mbit/s (for us, it means a couple thousand > >> events per sec)on a 1Gbit network. > >> > >> 'hadoop dfs -put' command is ok, I checked the bandwidth usage with > >> iftop. The bandwidth usage was ~200Mbits/sec, a 1Gig file is uploded > >> within seconds, so HDFS itself should not be slow. > >> > >> Flume is capable of receiving the messages with around the same speed, > >> so the Avro source we use is not the issue, I can see the memory > >> channel filling up. > >> > >> On the other hand, in iftop, I can see, that while Flume receives the > >> events fast, it only writes to the datanode with 5-10Mbit/sec, very > >> very slow. Why is that? I try to use huge batch-sizes for HDFS sink: > >> 10 000 - 100 000 events, because supposedly batch write is always > >> faster, but the sink only writes ~ 2 000 - 3 000 events per sec, > >> according to the JMX console. Smaller batch sizes (1 000) are not > >> faster. > >> > >> I could not find the magic configuration that makes HDFS sink write > >> faster, so I think there is generally something wrong in Flume. I even > >> tried the PseudoTx channel to disable transacions, without success of > >> improving the write performance. > >> > >> Setup: > >> I have setup of 5 physical machines, strong ones, Dell T5600, 6 core > >> xeon, Gigabit networking: > >> - Flume Avro client generating log messages > >> - Flume Agent > >> - HDFS namenode > >> - HDFS datanode1 > >> - HDFS datanode2 > >> > >> +++++++++++++++++++++++++++++ > >> Configuration: http://pastebin.com/53DGd3wm > >> > >> agent.sources = r1 > >> agent.channels = memoryChannel1 > >> agent.sinks = s1 > >> > >> > >> ############### > >> # source > >> ############## > >> > >> # For each one of the sources, the type is defined > >> agent.sources.r1.type = avro > >> > >> agent.sources.r1.threads = 10 > >> #agent.sources.r1.compression-type=deflate > >> > >> # The channel can be defined as follows. > >> agent.sources.r1.channels = memoryChannel1 > >> > >> # thrift specific configuration > >> agent.sources.r1.bind = 0.0.0.0 > >> agent.sources.r1.port = 50414 > >> > >> ############# > >> # sink > >> ############ > >> # Each sink's type must be defined > >> agent.sinks.s1.type = hdfs > >> agent.sinks.s1.hdfs.path = hdfs://namenode/flume/events/%D/%H/%M > >> agent.sinks.s1.hdfs.filePrefix = flume-events > >> agent.sinks.s1.hdfs.fileSuffix = .log > >> agent.sinks.s1.hdfs.fileType = DataStream > >> > >> # round by every 15 minutes > >> agent.sinks.s1.hdfs.round = true > >> agent.sinks.s1.hdfs.roundValue = 15 > >> agent.sinks.s1.hdfs.roundUnit = minute > >> agent.sinks.s1.hdfs.timeZone = UTC > >> > >> # never roll based on file size > >> agent.sinks.s1.hdfs.rollSize = 0 > >> # never roll based on count > >> agent.sinks.s1.hdfs.rollCount = 0 > >> # roll on every 1 minute > >> agent.sinks.s1.hdfs.rollInterval = 60 > >> agent.sinks.s1.hdfs.threadsPoolSize = 10 > >> agent.sinks.s1.hdfs.rollTimerPoolSize = 2 > >> > >> # events written to file before it is flushded to HDFS > >> agent.sinks.s1.hdfs.batchSize = 20000 > >> #Specify the channel the sink should use > >> agent.sinks.s1.channel = memoryChannel1 > >> > >> ############## > >> # channel > >> ############# > >> > >> # Each channel's type is defined. > >> # agent.channels.memoryChannel1.type = > >> org.apache.flume.channel.PseudoTxnMemoryChannel > >> agent.channels.memoryChannel1.type = memory > >> > >> > >> # Other config values specific to each type of channel(sink or source) > >> # can be defined as well > >> # In this case, it specifies the capacity of the memory channel > >> agent.channels.memoryChannel1.capacity = 500000 > >> agent.channels.memoryChannel1.transactionCapacity = 200000 > >> agent.channels.memoryChannel1.byteCapacity = 1000000000 > > > > >
