the attachment flume.conf is channel and sink config, dumps.txt is thread
dumps.
channel type "dual" is a channel type I developped to utilize the merits of
memory channel and filechannel. when the volume is not quite big, I use
memory channel, when the size of events reach to a percentage of the memory
channel capacity, it switch to the filechannel, when volume decrease switch
to memory again.
thanks for looking into this.
On Tue, Dec 17, 2013 at 8:54 PM, Brock Noland <[email protected]> wrote:
> Can you take and share 8-10 thread dumps while the sink is taking events
> "slowly"?
>
> Can you share your machine and file channel configuration?
> On Dec 17, 2013 6:28 AM, "Shangan Chen" <[email protected]> wrote:
>
>> we face the same problem, performance of taking events from channel is a
>> severe bottleneck. When there're less events in channel, problem does not
>> alleviate. following is a log of the metrics of writing to hdfs, writing to
>> 5 files with a batchsize of 200000, take cost the most of the total time.
>>
>>
>> 17 εδΊζ 2013 18:49:28,056 INFO
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:489) -
>> HdfsSink-TIME-STAT sink[sink_hdfs_b] writers[5] eventcount[200000]
>> all[44513] take[38197] append[5647] sync[17] getFilenameTime[371]
>>
>>
>>
>>
>>
>> On Mon, Nov 25, 2013 at 4:46 PM, Jan Van Besien <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> Is anybody still looking into this question?
>>>
>>> Should I log it in jira such that somebody can look into it later?
>>>
>>> thanks,
>>> Jan
>>>
>>>
>>>
>>> On 11/18/2013 11:28 AM, Jan Van Besien wrote:
>>> > Hi,
>>> >
>>> > Sorry it took me a while to answer this. I compiled a small test case
>>> > using only off the shelve flume components that shows what is going on.
>>> >
>>> > The setup is a single agent with http source, null sink and file
>>> > channel. I am using the default configuration as much as possible.
>>> >
>>> > The test goes as follows:
>>> >
>>> > - start the agent without sink
>>> > - run a script that sends http requests in multiple threads to the http
>>> > source (the script simply calls the url
>>> http://localhost:8080/?key=value
>>> > over and over a gain, whereby value is a random string of 100 chars).
>>> > - this script does about 100 requests per second on my machine. I leave
>>> > it running for a while, such that the file channel contains about 20000
>>> > events.
>>> > - add the null sink to the configuration (around 11:14:33 in the log).
>>> > - observe the logging of the null sink. You'll see in the log file that
>>> > it takes more than 10 seconds per 1000 events (until about even 5000,
>>> > around 11:15:33)
>>> > - stop the http request generating script (i.e. no more writing in file
>>> > channel)
>>> > - observer the logging of the null sink: events 5000 until 20000 are
>>> all
>>> > processed within a few seconds.
>>> >
>>> > In the attachment:
>>> > - flume log
>>> > - thread dumps while the ingest was running and the null sink was
>>> enabled
>>> > - config (agent1.conf)
>>> >
>>> > I also tried with more sinks (4), see agent2.conf. The results are the
>>> same.
>>> >
>>> > Thanks for looking into this,
>>> > Jan
>>> >
>>> >
>>> > On 11/14/2013 05:08 PM, Brock Noland wrote:
>>> >> On Thu, Nov 14, 2013 at 2:50 AM, Jan Van Besien <[email protected]
>>> >> <mailto:[email protected]>> wrote:
>>> >>
>>> >> On 11/13/2013 03:04 PM, Brock Noland wrote:
>>> >> > The file channel uses a WAL which sits on disk. Each time an
>>> >> event is
>>> >> > committed an fsync is called to ensure that data is durable.
>>> Without
>>> >> > this fsync there is no durability guarantee. More details
>>> here:
>>> >> > https://blogs.apache.org/flume/entry/apache_flume_filechannel
>>> >>
>>> >> Yes indeed. I was just not expecting the performance impact to be
>>> >> that big.
>>> >>
>>> >>
>>> >> > The issue is that when the source is committing one-by-one
>>> it's
>>> >> > consuming the disk doing an fsync for each event. I would
>>> find a
>>> >> way to
>>> >> > batch up the requests so they are not written one-by-one or
>>> use
>>> >> multiple
>>> >> > disks for the file channel.
>>> >>
>>> >> I am already using multiple disks for the channel (4).
>>> >>
>>> >>
>>> >> Can you share your configuration?
>>> >>
>>> >> Batching the
>>> >> requests is indeed what I am doing to prevent the filechannel to
>>> be the
>>> >> bottleneck (using a flume agent with a memory channel in front
>>> of the
>>> >> agent with the file channel), but it inheritely means that I
>>> loose
>>> >> end-to-end durability because events are buffered in memory
>>> before being
>>> >> flushed to disk.
>>> >>
>>> >>
>>> >> I would be curious to know though if you doubled the sinks if that
>>> would
>>> >> give more time to readers. Could you take three-four thread dumps of
>>> the
>>> >> JVM while it's in this state and share them?
>>> >>
>>> >
>>>
>>>
>>
>>
>> --
>> have a good day!
>> chenshang'an
>>
>>
--
have a good day!
chenshang'an
lc_hadooptest04.channels.ch_hdfs_b.type = dual
lc_hadooptest04.channels.ch_hdfs_b.memory.capacity = 100000
lc_hadooptest04.channels.ch_hdfs_b.memory.transactionCapacity = 30000
lc_hadooptest04.channels.ch_hdfs_b.file.checkpointDir =
/data/ch_hdfs_b.checkpoint
lc_hadooptest04.channels.ch_hdfs_b.file.dataDirs = /data/ch_hdfs_b.data
lc_hadooptest04.channels.ch_hdfs_b.file.capacity = 200000000
lc_hadooptest04.channels.ch_hdfs_b.file.transactionCapacity = 30000
lc_hadooptest04.channels.ch_hdfs_b.file.checkpointInterval = 300000
lc_hadooptest04.channels.ch_hdfs_b.file.maxFileSize = 2146435071
lc_hadooptest04.channels.ch_hdfs_b.file.keep-alive = 30
lc_hadooptest04.channels.ch_hdfs_b.file.write-timeout = 30
##### sink defination
##############################################################
lc_hadooptest04.sinks.sink_hdfs_b.channel = ch_hdfs_b
lc_hadooptest04.sinks.sink_hdfs_b.type = hdfs
lc_hadooptest04.sinks.sink_hdfs_b.switchon = true
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.path =
/user/hive/warehouse/originallog.db/
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.filePrefix = lc_hadooptest04
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.rollInterval = 1200
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.rollSize = 3072000000
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.rollCount = 0
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.batchSize = 20000
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.callTimeout = 180000
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.idleTimeout = 1800
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.threadsPoolSize = 64
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.codeC = LzopCodec
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.fileType = CompressedStream
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.writeFormat = Text
lc_hadooptest04.sinks.sink_hdfs_b.hdfs.useLocalTimeStamp = True
lc_hadooptest04.sinks.sink_hdfs_b.serializer = TEXT
lc_hadooptest04.sinks.sink_hdfs_b.serializer.appendNewline = false
"hdfs-sink_hdfs_b-call-runner-63" prio=10 tid=0x00007f5bd8084800 nid=0x2eba
waiting on condition [0x00007f5e94fce000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006002bab58> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Locked ownable synchronizers:
- None
"hdfs-sink_hdfs_b-call-runner-62" prio=10 tid=0x00007f5bd8082800 nid=0x2eb9
waiting on condition [0x00007f5e950cf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006002bab58> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)