[ 
https://issues.apache.org/jira/browse/KAFKA-9769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-9769.
----------------------------
    Fix Version/s: 2.7.0
         Assignee: Andrew Choi
       Resolution: Fixed

merged the PR to trunk

> ReplicaManager Partition.makeFollower Increases LeaderEpoch when ZooKeeper 
> disconnect occurs
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9769
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>            Reporter: Andrew Choi
>            Assignee: Andrew Choi
>            Priority: Minor
>              Labels: kafka, replica, replication
>             Fix For: 2.7.0
>
>
> The ZooKeeper Session once expired and got disconnected and the broker 
> received the 1st LeaderAndIsr request simultaneously. As the broker was 
> processing the 1st LeaderAndIsr Request, the ZooKeeper session has not been 
> reestablished just yet.
> Within the makeFollowers method, _partition.getOrCreateReplica_ is called 
> before the fetcher begins. _partition.getOrCreateReplica_ needs to fetch 
> information from ZooKeeper but an exception is thrown when calling the 
> ZooKeeper client because the session is invalid, rendering the fetcher start 
> to be skipped.
>  
> In Partition class's getOrCreateReplica method calls AdminZkClient's 
> fetchEntityConfig(..) which throws an exception if the ZooKeeper session is 
> invalid. 
>  
> {code:java}
> val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic){code}
>  
> When this occurs, the leader epoch should not have been incremented due to 
> ZooKeeper being invalid because once the second LeaderAndIsr request comes 
> in, the leader epoch could be the same between the brokers. 
> Few options I can think of for a fix. I think third route could be feasible:
> 1 - Make LeaderEpoch update and fetch update atomic.
> 2 - Wait until all individual partitions are successful without problems then 
> process fetch.
> 3 - Catch the ZooKeeper exception in the caller code block 
> (ReplicaManager.makeFollowers) and simply do not touch the remaining 
> partitions to ensure that the batch of successful partitions up to that point 
> are updated and processed (fetch).
> 4 - Or make LeaderAndIsr request never arrive at the broker in case of 
> ZooKeeper disconnect, then that would be safe because it is already possible 
> for some replicas to receive the LeaderAndIsr later than the others. However, 
> in that case, the code need to make sure the controller will retry.
>  
> {code:java}
>  else if (requestLeaderEpoch > currentLeaderEpoch) {
>  // If the leader epoch is valid record the epoch of the controller that made 
> the leadership decision. 
> // This is useful while updating the isr to maintain the decision maker 
> controller's epoch in the zookeeper path 
> if (stateInfo.basePartitionState.replicas.contains(localBrokerId))       
> partitionState.put(partition, stateInfo)
> else 
>  
> def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {   
>        allReplicasMap.getAndMaybePut(replicaId, { 
> if (isReplicaLocal(replicaId)) {
> val adminZkClient = new AdminZkClient(zkClient) val props = 
> adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to