Hi there,



I'm facing a issue that duplicated kafka events is processed by flume, which 
happens only when rebalancing happens. After a little bit digging, I believe 
that it's a bug in KafkaSource class, but I need your help to confirm that.




Reason:




When rebalance happened, the KafkaSource will keep the events already polled, 
and commit them. See:




// Line 211:

// this flag is set to true in a callback when some partitions are revoked.

// If there are any records we commit them.

if (rebalanceFlag.compareAndSet(true, false)) {

   break;

}




And the tpAndOffsetMetadata map is *only cleared when commit success*:

// Line 313:

eventList.clear();




if (!tpAndOffsetMetadata.isEmpty()) {

  long commitStartTime = System.nanoTime();

  consumer.commitSync(tpAndOffsetMetadata);

  long commitEndTime = System.nanoTime();

  counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 
* 1000));

  tpAndOffsetMetadata.clear();

}




That is, once commitSync failed, next time when committing newly assigned 
partitions, the previous partition offset is still kept and will be committed. 
This could happen when rebalancing happened, eg. adding new consumers to the 
same group, which in our case is actually scaling the instances.  When 
rebalancing happens, it could possibly faile to commit with error like this:




Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?

Commit cannot be completed since the group has already rebalanced and assigned 
the partitions to another member. This means that the time between subsequent 
calls to poll() was longer than the configured max.poll.interval.ms, which 
typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with max.poll.records.




And the commit failure will happen again and again until somehow after a period 
rebalance finished. The real problem is that wrong offset  (could be a much 
eailer one than the actually consumed one) will finally be commited, which we 
belive is the final reason that causes message duplication.




The scenario is hard to re-produce, we're checked our log in production, 
there's lot's of such errors and last for long period(about 20 mins), and we've 
noticed sharply increasement of kafka unconsumed events, the only reason we 
could explain, is that the offset is reset to a previous number.




Can someone confirm on that?




Yours,




Riguz Lee

Reply via email to