James Grinter created FLUME-3333:
------------------------------------
Summary: KafkaSource will reprocess the same events through the
Interceptor chain multiple times, if anything throws an Exception during
doProcess
Key: FLUME-3333
URL: https://issues.apache.org/jira/browse/FLUME-3333
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: 1.9.0, 1.8.0
Environment: Observed on Apache Flume 1.8.0, java 8 (various), Apache
Kafka (1.1.1)
(But I think it's clear it's not an Environment issue.)
Reporter: James Grinter
A batch of events, of size batchSize, read from a Kafka topic was observed to
have been processed multiple times (in our case, over 400..) by the configured
Interceptors, before finally being completed and written into the Channel.
Upon reading of the KafkaSource code in 1.8 (the version that was running) and
version 1.9 (the latest release version), it was noticed that:
# There is a try/catch Exception around most of the processing loop in
doProcess()
# eventList, the list of inbound events that can be mutated by
ChannelProcessor processEventBatch() or the configured Interceptors, is cleared
after processing completes successfully and before the Kafka Consumer offsets
are committed.
# If an Exception is thrown somewhere during the loop, before
eventList.clear(), then eventList is not cleared and will have the same
contents on the next call to doProcess()
# No more records will be read from Kafka, because the eventList already
contains batchUpperLimit (set from config "batchSize") records.
# The same eventList contents, already potentially mutated, will then be
passed through the ChannelProcessor again.
Some possible ways to fix:
* Don't re-use the eventList across invocations to doProcess()
* If eventList is declared this way to allow object re-use then just clear the
eventList when an Exception is thrown: If the KafkaConsumer offsets aren't
committed because of a failure, or even just because there is a
ChannelException raised by processEventBatch(), then it isn't valid to keep its
contents around once it may have been changed (I think they will/should be
provided on a subsequent poll()? But I''m not 100% clear on what the Kafka
Consumer APIs guarantee, here)
* Don't allow processEventBatch() to mutate the List<Event> events, or
processEvent() to mutate the Event object. NB. many (all?) of the core
Interceptor implementations mutate the individual Event objects too, even if
they sometimes build a new List.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]