[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958425#comment-16958425 ]
ASF GitHub Bot commented on KAFKA-8972: --------------------------------------- ableegoldman commented on pull request #7589: KAFKA-8972: need to flush state even on unclean close URL: https://github.com/apache/kafka/pull/7589 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > -------------------------------------------------------------------------------- > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.4.0 > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)