[
https://issues.apache.org/jira/browse/KAFKA-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma updated KAFKA-3964:
-------------------------------
Labels: reliability (was: )
> Metadata update requests are sometimes received after LeaderAndIsrRequests
> --------------------------------------------------------------------------
>
> Key: KAFKA-3964
> URL: https://issues.apache.org/jira/browse/KAFKA-3964
> Project: Kafka
> Issue Type: Bug
> Components: controller
> Reporter: Maysam Yabandeh
> Priority: Minor
> Labels: reliability
>
> The broker needs metadata of the leader before being able to process
> LeaderAndIsrRequest from the controller. For this reason on broker startup
> the controller first sends the metadata update requests and AFTER that it
> sends the LeaderAndIsrRequests:
> {code}
> def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers.
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new
> brokers faster
>
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark
> threads for the input list of partitions
> val allReplicasOnNewBrokers =
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers,
> OnlineReplica)
> {code}
> However this protocol is not followed when a nodes becomes the controller: it
> sends LeaderAndIsrRequests BEFORE sending the metadata update requests:
> {code}
> def onControllerFailover() {
> ...
> replicaStateMachine.startup()
> ...
> /* send partition leadership info to all live brokers */
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> {code}
> ReplicaStateMachine::startup
> {code}
> def startup() {
> ...
> // move all Online replicas to Online
> handleStateChanges(controllerContext.allLiveReplicas(),
> OnlineReplica){code}
> which trigger LeaderAndIsrRequest messages.
> Here is the symptoms that one would observe when this problem manifests:
> # The first set of messages that the broker receives from the controller is
> LeaderAndIsrRequests
> # The broker fails to become the follower as requested by the controller
> {code}
> 2016-07-12 21:03:53,081 ERROR change.logger: Broker 14 received
> LeaderAndIsrRequest with correlation id 0 from controller 21 epoch 290 for
> partition [topicxyz,7] but cannot become follower since the new leader 22 is
> unavailable.
> {code}
> # The fetcher hence does not start and the partition remains under-replicated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)