James Grinter created FLUME-3333:
------------------------------------

             Summary: KafkaSource will reprocess the same events through the 
Interceptor chain multiple times, if anything throws an Exception during 
doProcess
                 Key: FLUME-3333
                 URL: https://issues.apache.org/jira/browse/FLUME-3333
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: 1.9.0, 1.8.0
         Environment: Observed on Apache Flume 1.8.0, java 8 (various), Apache 
Kafka (1.1.1)

(But I think it's clear it's not an Environment issue.)
            Reporter: James Grinter


A batch of events, of size batchSize, read from a Kafka topic was observed to 
have been processed multiple times (in our case, over 400..) by the configured 
Interceptors, before finally being completed and written into the Channel.

Upon reading of the KafkaSource code in 1.8 (the version that was running) and 
version 1.9 (the latest release version), it was noticed that:
 # There is a try/catch Exception around most of the processing loop in 
doProcess()
 # eventList, the list of inbound events that can be mutated by 
ChannelProcessor processEventBatch() or the configured Interceptors, is cleared 
after processing completes successfully and before the Kafka Consumer offsets 
are committed.

 # If an Exception is thrown somewhere during the loop, before 
eventList.clear(), then eventList is not cleared and will have the same 
contents on the next call to doProcess()
 # No more records will be read from Kafka, because the eventList already 
contains batchUpperLimit (set from config "batchSize") records.
 # The same eventList contents, already potentially mutated, will then be 
passed through the ChannelProcessor again.

Some possible ways to fix:
 * Don't re-use the eventList across invocations to doProcess()
 * If eventList is declared this way to allow object re-use then just clear the 
eventList when an Exception is thrown: If the KafkaConsumer offsets aren't 
committed because of a failure, or even just because there is a 
ChannelException raised by processEventBatch(), then it isn't valid to keep its 
contents around once it may have been changed (I think they will/should be 
provided on a subsequent poll()? But I''m not 100% clear on what the Kafka 
Consumer APIs guarantee, here)
 * Don't allow processEventBatch() to mutate the List<Event> events, or 
processEvent() to mutate the Event object. NB. many (all?) of the core 
Interceptor implementations mutate the individual Event objects too, even if 
they sometimes build a new List.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to