Hello again guys,

sorry to bother, I'm kinda new to the Flume world and I'm running experiments to evaluate pros and cons about diverse topologies and settings.

I'm currently experiencing the following issue:

Agent topology made of: JMS source --> Kafka Channel --> HDFS sink
Source and Sink batch size, and Channel transactionCapacity = 1000
Channel capacity = 10000
Channel kafka.consumer.auto.offset.reset = latest
Channel migrateZookeeperOffsets = false

I'm trying to confirm the pair (channel, sink) is fault tolerant, and such an agent is capable of being re-started after being terminated, resuming its draining from the channel from the point (offset) where it was left.

I have a CDH5.8 cluster and launch my agent with a script that calls flume-ng, passing it a custom configuration I make available on a certain file.

Given x messages previously pushed on the input JMS queue, I:

1. start the agent through my script
2. verify it opens up a new file on hdfs and starts consuming events
   (it writes 1 event per line)
3. stop the agent (CTRL+C on the open shell session)
4. re-start the agent through my script
5. wait until it completes its draining
6. count the lines written across all the generated files

What I experienced is that:

1. when the script/process termination at step 3 is graceful (i.e. no
   exceptions are raised), I successfully verify that only x messages
   were written on HDFS;
2. when the script/process termination at step 3 is followed by the
   exception:

   16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events
   javax.jms.JMSException: InterruptedException has occurred while
   waiting for server response
        at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502)
        at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364)
        at
   com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293)
        at
   
com.tibco.tibjms.TibjmsxSessionImp._processNoWait(TibjmsxSessionImp.java:3548)
        at
   com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:1947)
        at
   
com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:240)
        at
   
com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait(TibjmsMessageConsumer.java:492)
        at
   
org.apache.flume.source.jms.JMSMessageConsumer.take(JMSMessageConsumer.java:127)
        at
   org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:261)
        at
   
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)
        at
   
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
        at java.lang.Thread.run(Thread.java:745)

   then I successfully verify that only x messages were written on HDFS;

3. when the script/process termination at step 3 is followed by the
   exception:

   ^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping
   lifecycle supervisor 11
   16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed
   java.lang.InterruptedException: Timed out before HDFS call was made.
   Your hdfs.callTimeout might be set too low or HDFS calls are taking
   too long.
        at
   
org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
        at
   org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
        at
   org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
        at
   
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
   org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
   16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event.
   Exception follows.
   org.apache.flume.EventDeliveryException:
   java.lang.InterruptedException: Timed out before HDFS call was made.
   Your hdfs.callTimeout might be set too low or HDFS calls are taking
   too long.
        at
   org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
        at
   
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
   org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
   Caused by: java.lang.InterruptedException: Timed out before HDFS
   call was made. Your hdfs.callTimeout might be set too low or HDFS
   calls are taking too long.
        at
   
org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
        at
   org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
        at
   org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
        ... 3 more
   16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ...

   .... logs continue ... then:

   16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup:
   Shutdown Metric for type: CHANNEL, name: chl_jms_kafka.
   channel.rollback.count == *<aNumberGreate**rTh**e**nZero>*

   then I get written on HDFS my initial x messages *_PLUS_*
   *<aNumberGreate**rTh**e**nZero> !!!

   *

This behaviour is really frustrating and I don't understand how to avoid those duplicates. As a side note, the same experiment but with a brutal agent termination at step 3 (related process's pid kill -9) does not produce duplicates!

I will appreciate any help on this (for me crucial) topic.

Thank you,

Roberto

Reply via email to