[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen reopened KAFKA-8972: -------------------------------- > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > -------------------------------------------------------------------------------- > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.4.0 > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)