[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reopened KAFKA-8972:
--------------------------------

> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8972
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Blocker
>             Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to