[ 
https://issues.apache.org/jira/browse/FLUME-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yangkun updated FLUME-3337:
---------------------------
    Description: 
I encountered an exception, as following:
{code:java}
KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
at 
org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
at 
org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748){code}
 ChannelProcessor#processEventBatch() method catch exception if put evet to 
channel error, but the catch code block first call tx.rollback(), however 
tx.rollback() method may throw new exception, this moment, the actual exception 
is overrided, we did't know the actual exception.We should first log the 
excpetion, then call tx.rollback().
{code:java}
public void processEventBatch(List<Event> events) {
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();

List<Event> batch = reqChannelQueue.get(reqChannel);

for (Event event : batch) {
reqChannel.put(event);
}

tx.commit();
} catch (Throwable t) {
// this line may throw new exception, the actual exception is overrided, we 
did't know the actual exception
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {

}
}
}
{code}
 

 

 

  was:
I encountered an exception, as following:
{code:java}
KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
at 
org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
at 
org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748){code}
 ChannelProcessor#processEventBatch() method catch exception if put evet to 
channel error, but the catch code block first call tx.rollback(), however 
tx.rollback() method may throw new exception, this moment, the actual exception 
is overrided, we did't know the actual exception.
{code:java}
public void processEventBatch(List<Event> events) {
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();

List<Event> batch = reqChannelQueue.get(reqChannel);

for (Event event : batch) {
reqChannel.put(event);
}

tx.commit();
} catch (Throwable t) {
// this line may throw new exception, the actual exception is overrided, we 
did't know the actual exception
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {

}
}
}
{code}
 

 

 


> ChannelProcessor#processEventBatch() method catch exception code block may 
> override actual exception
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3337
>                 URL: https://issues.apache.org/jira/browse/FLUME-3337
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: 1.9.0
>            Reporter: yangkun
>            Priority: Critical
>
> I encountered an exception, as following:
> {code:java}
> KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
> at 
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
> at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
> at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
> at 
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
> at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
> at java.lang.Thread.run(Thread.java:748){code}
>  ChannelProcessor#processEventBatch() method catch exception if put evet to 
> channel error, but the catch code block first call tx.rollback(), however 
> tx.rollback() method may throw new exception, this moment, the actual 
> exception is overrided, we did't know the actual exception.We should first 
> log the excpetion, then call tx.rollback().
> {code:java}
> public void processEventBatch(List<Event> events) {
> for (Channel reqChannel : reqChannelQueue.keySet()) {
> Transaction tx = reqChannel.getTransaction();
> Preconditions.checkNotNull(tx, "Transaction object must not be null");
> try {
> tx.begin();
> List<Event> batch = reqChannelQueue.get(reqChannel);
> for (Event event : batch) {
> reqChannel.put(event);
> }
> tx.commit();
> } catch (Throwable t) {
> // this line may throw new exception, the actual exception is overrided, we 
> did't know the actual exception
> tx.rollback();
> if (t instanceof Error) {
> LOG.error("Error while writing to required channel: " + reqChannel, t);
> throw (Error) t;
> } else if (t instanceof ChannelException) {
> throw (ChannelException) t;
> } else {
> throw new ChannelException("Unable to put batch on required " +
> "channel: " + reqChannel, t);
> }
> } finally {
> }
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to