[
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Chen reopened KAFKA-8972:
--------------------------------
> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback
> state
> --------------------------------------------------------------------------------
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.4.0
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}},
> and then clear the assignment.
> However, the subscription's assignment is already cleared in
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener
> would never be triggered. In other words, from consumer client's pov nothing
> is owned after unsubscribe, but from the user caller's pov the partitions are
> not revoked yet. For callers like Kafka Streams which rely on the rebalance
> listener to maintain their internal state, this leads to inconsistent state
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer
> re-joins the group later, it would still revoke everything anyways regardless
> of the passed-in parameters of the rebalance listener; with KIP-429 this is
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb
> response etc, then we know that all partitions are lost, and we should not
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside
> `onLeavePrepare`.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)