Hello, We’re using custom storage for kafka offsets. Partitions are assigned automatically, and we’re using ConsumerRebalanceListener to save offsets, clear threads and so on. In ‘onPartitionsAssinged’ method, we look up committed offsets for assigned partitions from the custom store. If no committed offsets are found, we get appropriate offsets (earliest/latest) from kafka.
According to javadocs for ConsumerRebalanceListener: "It is guaranteed that all consumer processes will invoke onPartitionsRevoked<http://apache.mesi.com.ar/kafka/0.9.0.1/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)> prior to any process invoking onPartitionsAssigned<http://apache.mesi.com.ar/kafka/0.9.0.1/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned(java.util.Collection)>.” However, we have seen this a couple of times now, that on a re-balance, ‘onPartitionsRevoked’ isn’t called. ‘onPartitionsAssigned’ gets called directly, which gets our application in a bad state. We looked briefly at the code, and seems like the code is doing as promised by the javadocs. So, the question is, is there any edge case, any flow out there which can possibly trigger this? One thing we observed, in both cases, was that while trying to get offsets from kafka (because offsets were not present in custom store), Fetcher retried at least once due to obsolete leadership information: Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying. Please let us know. Thanks, Ninad Naik.