[
https://issues.apache.org/jira/browse/FLUME-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yangkun updated FLUME-3337:
---------------------------
Description:
I encountered an exception, as following:
{code:java}
KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
at
org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
at
org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748){code}
ChannelProcessor#processEventBatch() method catch exception if put evet to
channel error, but the catch code block first call tx.rollback(), however
tx.rollback() method may throw new exception, this moment, the actual exception
is overrided, we did't know the actual exception.
{code:java}
public void processEventBatch(List<Event> events) {
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
// this line may throw new exception, the actual exception is overrided, we
did't know the actual exception
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
}
}
}
{code}
was:
I encountered an exception, as following:
{code:java}
KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
at
org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
at
org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748){code}
ChannelProcessor#processEventBatch() method catch exception is put evet to
channel error, but the catch code block first call tx.rollback(), however
tx.rollback() method may throw new exception, this moment, the actual exception
is overrided, we did't know the actual exception.
{code:java}
public void processEventBatch(List<Event> events) {
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
// this line may throw new exception, the actual exception is overrided, we
did't know the actual exception
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
}
}
}
{code}
> ChannelProcessor#processEventBatch() method catch exception code block may
> override actual exception
> ----------------------------------------------------------------------------------------------------
>
> Key: FLUME-3337
> URL: https://issues.apache.org/jira/browse/FLUME-3337
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: 1.9.0
> Reporter: yangkun
> Priority: Critical
>
> I encountered an exception, as following:
> {code:java}
> KafkaSource EXCEPTION, {} java.lang.NullPointerException: null
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587)
> at
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196)
> at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
> at
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
> at
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
> at java.lang.Thread.run(Thread.java:748){code}
> ChannelProcessor#processEventBatch() method catch exception if put evet to
> channel error, but the catch code block first call tx.rollback(), however
> tx.rollback() method may throw new exception, this moment, the actual
> exception is overrided, we did't know the actual exception.
> {code:java}
> public void processEventBatch(List<Event> events) {
> for (Channel reqChannel : reqChannelQueue.keySet()) {
> Transaction tx = reqChannel.getTransaction();
> Preconditions.checkNotNull(tx, "Transaction object must not be null");
> try {
> tx.begin();
> List<Event> batch = reqChannelQueue.get(reqChannel);
> for (Event event : batch) {
> reqChannel.put(event);
> }
> tx.commit();
> } catch (Throwable t) {
> // this line may throw new exception, the actual exception is overrided, we
> did't know the actual exception
> tx.rollback();
> if (t instanceof Error) {
> LOG.error("Error while writing to required channel: " + reqChannel, t);
> throw (Error) t;
> } else if (t instanceof ChannelException) {
> throw (ChannelException) t;
> } else {
> throw new ChannelException("Unable to put batch on required " +
> "channel: " + reqChannel, t);
> }
> } finally {
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]