Hi,
What you've described sounds plausible, other than it continuing to occur
for 20 minutes. The idea that you might fail to commit offsets if messages
are consumed and a rebalance occurs before the offsets are committed makes
perfect sense.

Question becomes, what is going on to cause this to happen for 20 minutes?
Do you have a full memory channel somewhere along the line caused by a
failing sink? Or are we repeatedly failing to commit the same offsets???

Tristan

On 24 Apr 2022 11:28, Riguz Lee <d...@riguz.com> wrote:

Hi there,




I'm facing a issue that duplicated kafka events is processed by flume,
which happens only when rebalancing happens. After a little bit digging, I
believe that it's a bug in KafkaSource class, but I need your help to
confirm that.




Reason:




When rebalance happened, the KafkaSource will keep the events already
polled, and commit them. See:




// Line 211:

// this flag is set to true in a callback when some partitions are revoked.

// If there are any records we commit them.

if (rebalanceFlag.compareAndSet(true, false)) {

&nbsp; &nbsp;break;

}




And the tpAndOffsetMetadata map is *only cleared when commit success*:

// Line 313:

eventList.clear();




if (!tpAndOffsetMetadata.isEmpty()) {

&nbsp; long commitStartTime = System.nanoTime();

&nbsp; consumer.commitSync(tpAndOffsetMetadata);

&nbsp; long commitEndTime = System.nanoTime();

&nbsp; counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) /
(1000 * 1000));

&nbsp; tpAndOffsetMetadata.clear();

}




That is, once commitSync failed, next time when committing newly assigned
partitions, the previous partition offset is still kept and will be
committed. This could happen when rebalancing happened, eg. adding new
consumers to the same group, which in our case is actually scaling the
instances.  When rebalancing happens, it could possibly faile to commit
with error like this:




Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?

Commit cannot be completed since the group has already rebalanced and
assigned the partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by
increasing the session timeout or by reducing the maximum size of batches
returned in poll() with max.poll.records.




And the commit failure will happen again and again until somehow after a
period rebalance finished. The real problem is that wrong offset  (could be
a much eailer one than the actually consumed one) will finally be commited,
which we belive is the final reason that causes message duplication.




The scenario is hard to re-produce, we're checked our log in production,
there's lot's of such errors and last for long period(about 20 mins), and
we've noticed sharply increasement of kafka unconsumed events, the only
reason we could explain, is that the offset is reset to a previous number.




Can someone confirm on that?




Yours,




Riguz Lee

Reply via email to