[ https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-3963: ------------------------------- Fix Version/s: 0.10.1.0 > Missing messages from the controller to brokers > ----------------------------------------------- > > Key: KAFKA-3963 > URL: https://issues.apache.org/jira/browse/KAFKA-3963 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Maysam Yabandeh > Priority: Minor > Fix For: 0.10.1.0 > > > The controller takes messages from a queue and send it to the designated > broker. If the controller times out on receiving a response from the broker > (30s) it closes the connection and retries again after a backoff period, > however it does not return the message back to the queue. As a result the > retry will start with the next message and the previous message might have > never been received by the broker. > {code} > val QueueItem(apiKey, apiVersion, request, callback) = queue.take() > ... > try { > ... > clientResponse = > networkClient.blockingSendAndReceive(clientRequest)(time) > ... > } > } catch { > case e: Throwable => // if the send was not successful, reconnect > to broker and resend the message > warn(("Controller %d epoch %d fails to send request %s to > broker %s. " + > "Reconnecting to broker.").format(controllerId, > controllerContext.epoch, > request.toString, brokerNode.toString()), e) > networkClient.close(brokerNode.idString) > ... > } > {code} > This could violates the semantics that developers had assumed when writing > controller-broker protocol. For example, the controller code sends metadata > updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly > joined broker for the first time. > {code} > def onBrokerStartup(newBrokers: Seq[Int]) { > info("New broker startup callback for > %s".format(newBrokers.mkString(","))) > val newBrokersSet = newBrokers.toSet > // send update metadata request to all live and shutting down brokers. > Old brokers will get to know of the new > // broker via this update. > // In cases of controlled shutdown leaders will not be elected when a new > broker comes up. So at least in the > // common controlled shutdown case, the metadata will reach the new > brokers faster > > sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) > // the very first thing to do when a new broker comes up is send it the > entire list of partitions that it is > // supposed to host. Based on that the broker starts the high watermark > threads for the input list of partitions > val allReplicasOnNewBrokers = > controllerContext.replicasOnBrokers(newBrokersSet) > replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, > OnlineReplica) > {code} > This is important because without the metadata cached in the broker the > LeaderAndIsrRequests that ask the broker to become a follower would fail > since there is no metadata for leader of the partition. > {code} > metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { > // Only change partition state when the leader is available > case Some(leaderBroker) => > ... > case None => > // The leader broker should always be present in the metadata > cache. > // If not, we should record the error message and abort the > transition process for this partition > stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest > with correlation id %d from controller" + > " %d epoch %d for partition [%s,%d] but cannot become follower > since the new leader %d is unavailable.") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)