Hello again guys,
sorry to bother, I'm kinda new to the Flume world and I'm running
experiments to evaluate pros and cons about diverse topologies and settings.
I'm currently experiencing the following issue:
Agent topology made of: JMS source --> Kafka Channel --> HDFS sink
Source and Sink batch size, and Channel transactionCapacity = 1000
Channel capacity = 10000
Channel kafka.consumer.auto.offset.reset = latest
Channel migrateZookeeperOffsets = false
I'm trying to confirm the pair (channel, sink) is fault tolerant, and
such an agent is capable of being re-started after being terminated,
resuming its draining from the channel from the point (offset) where it
was left.
I have a CDH5.8 cluster and launch my agent with a script that calls
flume-ng, passing it a custom configuration I make available on a
certain file.
Given x messages previously pushed on the input JMS queue, I:
1. start the agent through my script
2. verify it opens up a new file on hdfs and starts consuming events
(it writes 1 event per line)
3. stop the agent (CTRL+C on the open shell session)
4. re-start the agent through my script
5. wait until it completes its draining
6. count the lines written across all the generated files
What I experienced is that:
1. when the script/process termination at step 3 is graceful (i.e. no
exceptions are raised), I successfully verify that only x messages
were written on HDFS;
2. when the script/process termination at step 3 is followed by the
exception:
16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events
javax.jms.JMSException: InterruptedException has occurred while
waiting for server response
at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502)
at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364)
at
com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293)
at
com.tibco.tibjms.TibjmsxSessionImp._processNoWait(TibjmsxSessionImp.java:3548)
at
com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:1947)
at
com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:240)
at
com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait(TibjmsMessageConsumer.java:492)
at
org.apache.flume.source.jms.JMSMessageConsumer.take(JMSMessageConsumer.java:127)
at
org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:261)
at
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
at java.lang.Thread.run(Thread.java:745)
then I successfully verify that only x messages were written on HDFS;
3. when the script/process termination at step 3 is followed by the
exception:
^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping
lifecycle supervisor 11
16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed
java.lang.InterruptedException: Timed out before HDFS call was made.
Your hdfs.callTimeout might be set too low or HDFS calls are taking
too long.
at
org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event.
Exception follows.
org.apache.flume.EventDeliveryException:
java.lang.InterruptedException: Timed out before HDFS call was made.
Your hdfs.callTimeout might be set too low or HDFS calls are taking
too long.
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: Timed out before HDFS
call was made. Your hdfs.callTimeout might be set too low or HDFS
calls are taking too long.
at
org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
... 3 more
16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ...
.... logs continue ... then:
16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup:
Shutdown Metric for type: CHANNEL, name: chl_jms_kafka.
channel.rollback.count == *<aNumberGreate**rTh**e**nZero>*
then I get written on HDFS my initial x messages *_PLUS_*
*<aNumberGreate**rTh**e**nZero> !!!
*
This behaviour is really frustrating and I don't understand how to avoid
those duplicates. As a side note, the same experiment but with a brutal
agent termination at step 3 (related process's pid kill -9) does not
produce duplicates!
I will appreciate any help on this (for me crucial) topic.
Thank you,
Roberto