Hello,
We’re using custom storage for kafka offsets. Partitions are assigned 
automatically, and we’re using ConsumerRebalanceListener to save offsets, clear 
threads and so on. In ‘onPartitionsAssinged’ method, we look up committed 
offsets for assigned partitions from the custom store. If no committed offsets 
are found, we get appropriate offsets (earliest/latest) from kafka.

According to javadocs for ConsumerRebalanceListener:

"It is guaranteed that all consumer processes will invoke 
onPartitionsRevoked<http://apache.mesi.com.ar/kafka/0.9.0.1/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)>
 prior to any process invoking 
onPartitionsAssigned<http://apache.mesi.com.ar/kafka/0.9.0.1/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned(java.util.Collection)>.”

However, we have seen this a couple of times now, that on a re-balance, 
‘onPartitionsRevoked’ isn’t called. ‘onPartitionsAssigned’ gets called 
directly, which gets our application in a bad state.

We looked briefly at the code, and seems like the code is doing as promised by 
the javadocs. So, the question is, is there any edge case, any flow out there 
which can possibly trigger this?

One thing we observed, in both cases, was that while trying to get offsets from 
kafka (because offsets were not present in custom store), Fetcher retried at 
least once due to obsolete leadership information:


Attempt to fetch offsets for partition {} failed due to obsolete leadership 
information, retrying.

Please let us know.

Thanks,
Ninad Naik.

Reply via email to