[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961602#comment-16961602
 ] 

ASF GitHub Bot commented on KAFKA-8972:
---------------------------------------

ableegoldman commented on pull request #7608: KAFKA-8972 (2.4 blocker): clear 
all state for zombie task on commit failed
URL: https://github.com/apache/kafka/pull/7608
 
 
   Third bugfix for the failing broker bounce system test with cooperative 
rebalancing:
   
   **tl;dr** We need to remove everything associated with a task when it is 
closed, but in some cases (eg AssignedTasks#commit`) on a 
`TaskMigratedException` we would close it as a zombie and then (only) remove 
the taskId from the `running` map. This left  its partitions, restorers, state 
stores, etc around and in an undefined state, causing exceptions when closing 
and/or opening the stores again.
   
   **Longer explanation:**
   In AssignedTasks (the abstract class from which the standby and active task 
variations extend) a commit failure (even due to broker down/unavailable) is 
treated as a TaskMigratedException after which the failed task is closed as a 
zombie and removed from running -- the remaining tasks (ie those still in 
running are then also closed as zombies in the subsequent onPartitionsLost
   
   However we do not remove the closed task from runningByPartition nor do we 
remove the corresponding changelogs, if restoring, from the 
StoreChangelogReader since that applies only to active tasks, and AssignedTasks 
is generic/abstract. The changelog reader then retains a mapping from the 
closed task's changelog partition to its CompositeRestoreListener (and does not 
replace this when the new one comes along after the rebalance). The restore 
listener has a reference to a specific RocksDBStore instance, one which was 
closed when the task was closed as a zombie, so it accidentally tries to 
restore to the "old" RocksDBStore instance rather than the new one that was 
just opened.
   
   Although technically this bug existed before KIP-429, it was only uncovered 
now that we remove tasks and clear their state/partitions/etc one at a time. We 
don't technically need to cherrypick the fix back earlier as before we just 
blindly clear all data structures entirely during an eager rebalance.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8972
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Blocker
>             Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to