[ 
https://issues.apache.org/jira/browse/FLUME-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878964#comment-16878964
 ] 

AM K commented on FLUME-3337:
-----------------------------

In my opinion, the real issue is the NPE coming from SpillableMemoryChannel's 
rollback. rollback must not throw.

Nevertheless it does not hurt to strengthen ChannelProcessor, I will take a 
shot at it.

 

> ChannelProcessor#processEventBatch() method catch exception code block may 
> override actual exception
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3337
>                 URL: https://issues.apache.org/jira/browse/FLUME-3337
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: 1.9.0
>            Reporter: yangkun
>            Priority: Critical
>
> I encountered an exception, as following:
> {code:java}
> KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
> at 
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
> at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
> at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
> at 
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
> at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
> at java.lang.Thread.run(Thread.java:748){code}
>  ChannelProcessor#processEventBatch() method catch exception if put evet to 
> channel error, but the catch code block first call tx.rollback(), however 
> tx.rollback() method may throw new exception, this moment, the actual 
> exception is overrided, we did't know the actual exception.We should first 
> log the excpetion, then call tx.rollback().
> {code:java}
> public void processEventBatch(List<Event> events) {
> for (Channel reqChannel : reqChannelQueue.keySet()) {
> Transaction tx = reqChannel.getTransaction();
> Preconditions.checkNotNull(tx, "Transaction object must not be null");
> try {
> tx.begin();
> List<Event> batch = reqChannelQueue.get(reqChannel);
> for (Event event : batch) {
> reqChannel.put(event);
> }
> tx.commit();
> } catch (Throwable t) {
> // this line may throw new exception, the actual exception is overrided, we 
> did't know the actual exception
> tx.rollback();
> if (t instanceof Error) {
> LOG.error("Error while writing to required channel: " + reqChannel, t);
> throw (Error) t;
> } else if (t instanceof ChannelException) {
> throw (ChannelException) t;
> } else {
> throw new ChannelException("Unable to put batch on required " +
> "channel: " + reqChannel, t);
> }
> } finally {
> }
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to