[
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dmitry Bugaychenko updated KAFKA-2029:
--------------------------------------
Comment: was deleted
(was: The problem is that the state is distributed - not only controller, but
each broker has it considering itself either as leader or as follower. When
doing a transition we need to make sure all parties completed it before
switching to next partition. Right now transition is implemented via
asynchronous messages to brokers without waiting for replies - in this case
event single threaded controller might easiliy bring system into incosistent
sate (having multiple brokers treating themselves as leades for same partition))
> Improving controlled shutdown for rolling updates
> -------------------------------------------------
>
> Key: KAFKA-2029
> URL: https://issues.apache.org/jira/browse/KAFKA-2029
> Project: Kafka
> Issue Type: Improvement
> Components: controller
> Affects Versions: 0.8.1.1
> Reporter: Dmitry Bugaychenko
> Assignee: Neha Narkhede
> Priority: Critical
> Attachments: KAFKA-2029.patch, KAFKA-2029.patch
>
>
> Controlled shutdown as implemented currently can cause numerous problems:
> deadlocks, local and global datalos, partitions without leader and etc. In
> some cases the only way to restore cluster is to stop it completelly using
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are
> extremly painful in setup with large brokers (36 disks, 1000+ assigned
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to
> consider global refactoring of the controller (make it single thread, or even
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple
> shutdown requests and finally considers it as failed and procedes to unclean
> shutdow, controller got stuck for a while (holding a lock waiting for free
> space in controller-to-broker queue). After broker starts back it receives
> followers request and erases highwatermarks (with a message that "replica
> does not exists" - controller hadn't yet sent a request with replica
> assignment), then controller starts replicas on the broker it deletes all
> local data (due to missing highwatermarks). Furthermore, controller starts
> processing pending shutdown request and stops replicas on the broker letting
> it in a non-functional state. Solution to the problem might be to increase
> time broker waits for controller reponse to shutdown request, but this
> timeout is taken from controller.socket.timeout.ms which is global for all
> broker-controller communication and setting it to 30 minutes is dangerous.
> *Proposed solution: introduce dedicated config parameter for this timeout
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back
> controller got stuck in a deadlock (one thread owns the lock and tries to add
> message to the dead broker's queue, send thread is a infinite loop trying to
> retry message to the dead broker, and the broker failure handler is waiting
> for a lock). There are numerous partitions without a leader and the only way
> out is to kill -9 the controller. *Proposed solution: add timeout for adding
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
> def sendRequest(brokerId : Int, request : RequestOrResponse, callback:
> (RequestOrResponse) => Unit = null) {
> brokerLock synchronized {
> val stateInfoOpt = brokerStateInfo.get(brokerId)
> stateInfoOpt match {
> case Some(stateInfo) =>
> // ODKL Patch: prevent infinite hang on trying to send message to a
> dead broker.
> // TODO: Move timeout to config
> if (!stateInfo.messageQueue.offer((request, callback), 10,
> TimeUnit.SECONDS)) {
> error("Timed out trying to send message to broker " +
> brokerId.toString)
> // Do not throw, as it brings controller into completely
> non-functional state
> // "Controller to broker state change requests batch is not empty
> while creating a new one"
> //throw new IllegalStateException("Timed out trying to send
> message to broker " + brokerId.toString)
> }
> case None =>
> warn("Not sending request %s to broker %d, since it is
> offline.".format(request, brokerId))
> }
> }
> }
> {code}
> # When broker which is a controler starts shut down if auto leader rebalance
> is running it deadlocks in the end (shutdown thread owns the lock and waits
> for rebalance thread to exit and rebalance thread wait for lock). *Proposed
> solution: use bounded wait in rebalance thread*. KafkaController.scala:
> {code}
> // ODKL Patch to prevent deadlocks in shutdown.
> /**
> * Execute the given function inside the lock
> */
> def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
> if (isRunning || lock.isHeldByCurrentThread) {
> // TODO: Configure timeout.
> if (!lock.tryLock(10, TimeUnit.SECONDS)) {
> throw new IllegalStateException("Failed to acquire controller lock in
> 10 seconds.");
> }
> try {
> return fun
> } finally {
> lock.unlock()
> }
> } else {
> throw new IllegalStateException("Controller is not running, not allowed
> to lock.")
> }
> }
> private def checkAndTriggerPartitionRebalance(): Unit = {
> // Use inLockIfRunning here instead of inLock
> }
> {code}
> # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector
> act in a way that they prefer the oldes replica in ISR (the one that joined
> the ISR first). In case of rolling update it means moving partitions to the
> tail which increases the overal amount of movements and finally significantly
> overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of
> leadership). In case of multiple failures this logic can cuase a significant
> disbalance in the leadership. *Proposed solution: Move leadership to preferd
> replica if possible or to the younges replica (in controlled shutdown) or
> second prefered replica (in offline partition)*:
> {code}
> class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
> extends PartitionLeaderSelector with Logging {
> ...
> // ODKL Patch: Trying to select replica from ISR not depending on
> ISR join order, but following the
> // assignment order. Preferred replica is the first one, thus if
> possible it'll be chosen, but most
> // probably it is the dead one, thus we fallback to second
> preferred replica. Here we do not care about
> // overloading second preferred replica as we do not expect
> rolling crashed.
> val newLeader = liveBrokersInIsr.sortBy(x =>
> assignedReplicas.indexOf(x)).head
> ...
> }
> class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
> extends PartitionLeaderSelector
> with Logging {
> ...
> // ODKL Patch: Trying to select replica from ISR not depending on ISR
> join order. If preferred replica is in ISR, choose
> // it, choose the last replica from ISR - it is expected to be the
> youngest (most probably already survived rolling
> // update)
> val newLeaderOpt = if (newIsr.contains(assignedReplicas.head))
> assignedReplicas.headOption else newIsr.lastOption
> ...
> }
> {code}
> # Auto leader rebalance started simultaneously with controlled shutdown
> compete with it for space in queue and can slow down the process. If the
> queue size is large it could also create a significant data loss (for few
> minutes there might be multiple brokers considering itself as a leader and
> accepting produce requests). *Proposed solution: add throttling to the auto
> rebalance*:
> {code}
> private def checkAndTriggerPartitionRebalance(): Unit = {
> ...
> if (imbalanceRatio >
> (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
> info("Balancing broker " + leaderBroker + " with imbalance rate "
> + imbalanceRatio)
> topicsNotInPreferredReplica.foreach {
> case (topicPartition, replicas) => {
> inLockIfRunning(controllerContext.controllerLock) {
> // do this check only if the broker is live and there are
> no partitions being reassigned currently
> // and preferred replica election is not in progress
> if (controllerContext.liveBrokerIds.contains(leaderBroker)
> &&
> controllerContext.partitionsBeingReassigned.size == 0 &&
>
> controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
>
> !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
>
> !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
>
> controllerContext.allTopics.contains(topicPartition.topic)) {
> onPreferredReplicaElection(Set(topicPartition), true)
> }
> }
> // ODKL patch: prevent too fast prefered replica elections.
> // TODO: Make configurable/use true throttling
> Utils.swallow(Thread.sleep(2000))
> }
> }
> info("Balancing broker " + leaderBroker + " done")
> }
> ...
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)