[ 
https://issues.apache.org/jira/browse/KAFKA-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493523#comment-15493523
 ] 

Jun Rao commented on KAFKA-3964:
--------------------------------

[~krishna97], it seems that this is the same issue as in 
https://issues.apache.org/jira/browse/KAFKA-3042?

> Metadata update requests are sometimes received after LeaderAndIsrRequests
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-3964
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3964
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Maysam Yabandeh
>            Priority: Minor
>
> The broker needs metadata of the leader before being able to process 
> LeaderAndIsrRequest from the controller. For this reason on broker startup 
> the controller first sends the metadata update requests and AFTER that it 
> sends the LeaderAndIsrRequests:
> {code}
>  def onBrokerStartup(newBrokers: Seq[Int]) {
>     info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
>     val newBrokersSet = newBrokers.toSet
>     // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
>     // broker via this update.
>     // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
>     // common controlled shutdown case, the metadata will reach the new 
> brokers faster
>     
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
>     // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
>     // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
>     val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
>     replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> However this protocol is not followed when a nodes becomes the controller: it 
> sends LeaderAndIsrRequests BEFORE sending the metadata update requests:
> {code}
>   def onControllerFailover() {
> ...
>       replicaStateMachine.startup()
> ...
>       /* send partition leadership info to all live brokers */      
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> {code}
> ReplicaStateMachine::startup
> {code}
>   def startup() {
> ...
>     // move all Online replicas to Online
>     handleStateChanges(controllerContext.allLiveReplicas(), 
> OnlineReplica){code}
> which trigger LeaderAndIsrRequest messages.
> Here is the symptoms that one would observe when this problem manifests:
> # The first set of messages that the broker receives from the controller is 
> LeaderAndIsrRequests
> # The broker fails to become the follower as requested by the controller
> {code}
> 2016-07-12 21:03:53,081 ERROR change.logger: Broker 14 received 
> LeaderAndIsrRequest with correlation id 0 from controller 21 epoch 290 for 
> partition [topicxyz,7] but cannot become follower since the new leader 22 is 
> unavailable.
> {code}
> # The fetcher hence does not start and the partition remains under-replicated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to