[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371844#comment-14371844
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
-------------------------------------------

You right about timeout - it was a bad idea. There are cases ehrn broker fails 
to process it queue for a while. Basically what we wont is to:
# Do not lock the controller waiting for a broker response
# Minimize racing during normal procedures (controlled shutdow/preferred 
replica election)

Increasing the queue helps to 1, but ruins 2. So, in order to avoid racing in 
normal procedures with a large queue size we ended up by adding requests 
completion tracking to partition movement - controller starts moving next 
partition only after it is done with the current one (the wait is done outside 
the controller lock). It looks like that:

# ControllerChannelManager.scala
{code}
class ControllerChannelManager (private val controllerContext: 
ControllerContext, config: KafkaConfig) extends Logging {
...
// ODKL Patch: Execute a code tracking the completion of broker requests being 
sent.
  private val tracker = new ThreadLocal[RequestTracker]
  def tracked[T]()(fun: => T): T = {
    if (tracker.get() != null) {
      throw new IllegalStateException("Tracker already initialized.")
    }
    val reqisteredTracker: RequestTracker = new RequestTracker()
    tracker.set(reqisteredTracker)
    try {
      return fun
    } finally {
      tracker.remove()
      reqisteredTracker.await()
    }
  }

  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          // ODKL Patch: pass tracker to acknowledge request
          val requestsTracker: RequestTracker = tracker.get()
          stateInfo.messageQueue.put((request, callback, requestsTracker))
          if(requestsTracker != null) {
            requestsTracker.addRequest()
          }
        case None =>
          warn("Not sending request %s to broker %d, since it is 
offline.".format(request, brokerId))
      }
    }
  }
...
  private def removeExistingBroker(brokerId: Int) {
    try {
      // ODKL Patch: When removing broker, make sure to notify trackers that 
the requests won't ever complete.
      val info: ControllerBrokerStateInfo = brokerStateInfo.remove(brokerId).get
      info.channel.disconnect()
      info.requestSendThread.shutdown()
      val remaning = new util.ArrayList[(RequestOrResponse, (RequestOrResponse) 
=> Unit, RequestTracker)](info.messageQueue.remainingCapacity())
      info.messageQueue.drainTo(remaning)
      JavaConversions.collectionAsScalaIterable(remaning).foreach(x => if (x._3 
!= null) x._3.releaseRequest())
    }catch {
      case e: Throwable => error("Error while removing broker by the 
controller", e)
    }
  }
...
}

class RequestSendThread(val controllerId: Int,
                        val controllerContext: ControllerContext,
                        val toBroker: Broker,
                        val queue: BlockingQueue[(RequestOrResponse, 
(RequestOrResponse) => Unit, RequestTracker)],
                        val channel: BlockingChannel)
  extends 
ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId,
 toBroker.id)) {
...
  override def doWork(): Unit = {
    val queueItem = queue.take()
    val request = queueItem._1
    val callback = queueItem._2
    val tracker = queueItem._3
    var receive: Receive = null
    try {
...
    } finally {
      if (tracker != null) {
        tracker.releaseRequest()
      }
    }
  }
...
}

// ODKL Patch: Used to track completion of the request.
class RequestTracker() {
  val MAX_REQUESTS = 100000
  val requests : Semaphore = new Semaphore(MAX_REQUESTS)

  def addRequest(): Unit = {
    if(!requests.tryAcquire()) {
      throw new IllegalStateException("Maximum amount of tracked requests " + 
MAX_REQUESTS + " exceeded")
    }
  }

  def releaseRequest(): Unit = {
    requests.release()
  }

  def await(): Unit = {
    requests.acquire(MAX_REQUESTS)
  }
}
{code}
# KafkaController.scala
{code}
...
  private def checkAndTriggerPartitionRebalance(): Unit = {
    if (isActive()) {
...
          if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
            info("Balancing broker " + leaderBroker + " with imbalance rate " + 
imbalanceRatio)
            topicsNotInPreferredReplica.foreach {
              case (topicPartition, replicas) => {
                // ODKL Patch: Make sure leadership moved before switching to 
the next one.
                controllerContext.controllerChannelManager.tracked() {
                  inLockIfRunning(controllerContext.controllerLock) {
                    // do this check only if the broker is live and there are 
no partitions being reassigned currently
                    // and preferred replica election is not in progress
                    if (controllerContext.liveBrokerIds.contains(leaderBroker) 
&&
                      controllerContext.partitionsBeingReassigned.size == 0 &&
                      
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                      
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                      
!deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
                      
controllerContext.allTopics.contains(topicPartition.topic)) {
                      onPreferredReplicaElection(Set(topicPartition), true)
                    }
                  }
                }
              }
            }
            info("Balancing broker " + leaderBroker + " done")
          }
        }
      }
    }
  }
...
  def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
...
      allPartitionsAndReplicationFactorOnBroker.foreach {
        case(topicAndPartition, replicationFactor) =>
          // ODKL Patch: Make sure leadership moved before switching to the 
next one.
          controllerContext.controllerChannelManager.tracked() {
            // Move leadership serially to relinquish lock.
            inLock(controllerContext.controllerLock) {
              
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { 
currLeaderIsrAndControllerEpoch =>
                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
                  // If the broker leads the topic partition, transition the 
leader and update isr. Updates zk and
                  // notifies all affected brokers
                  
partitionStateMachine.handleStateChanges(Set(topicAndPartition), 
OnlinePartition,
                    controlledShutdownPartitionLeaderSelector)
                }
                else {
                  // Stop the replica first. The state change below initiates 
ZK changes which should take some time
                  // before which the stop replica request should be completed 
(in most cases)
                  brokerRequestBatch.newBatch()
                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topicAndPartition.topic,
                    topicAndPartition.partition, deletePartition = false)
                  brokerRequestBatch.sendRequestsToBrokers(epoch, 
controllerContext.correlationId.getAndIncrement)

                  // If the broker is a follower, updates the isr in ZK and 
notifies the current leader
                  
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                    topicAndPartition.partition, id)), OfflineReplica)
                }
              }
            }
          }
      }

...
    }
  }
{code}

Furthemore, we found one more racing condition - during intialization broker 
might lose its highwatermark checkpoints. It happens because the checkpoint is 
sheduled without initial delay and it starts immediatelly after broker is made 
leader or follower for some partition (ReplicaManager.becomeLeaderOrFollower). 
When the checkpointing is initated (shortly after) it writes all watermark for 
partitions broker is aware of. Due to racing condition there are might be 
partitions still pending processing (eg. new broker receives stale become 
follower request from hanging controlled shutdown). After that highwatermarks 
are lost for some partitions and broker truncates them to zero. Simple solution 
might be to introduce initial delay, but it is a trade of between risk of 
loosing highwatermarsk due early failure or loosing them due to laging 
controller. Instead we keep not only known, but also unknown partitions in 
checkpoin file for a while ReplicaManager.scala:
{code}
  def checkpointHighWatermarks() {
    val replicas = 
allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
Some(replica) => replica}
    val replicasByDir = 
replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
    for((dir, reps) <- replicasByDir) {
      var hwms = reps.map(r => (new TopicAndPartition(r) -> 
r.highWatermark)).toMap
      try {
        // ODKL Patch: If some partitions are not yet initialized, restore them 
from the old checkpoints.
        // This prevents broker from erasing checkpoints in case of racing 
during broker initialization
        // TODO: Make this configurable.
        if (java.lang.management.ManagementFactory.getRuntimeMXBean.getUptime < 
TimeUnit.HOURS.toMillis(1)) {
          highWatermarkCheckpoints(dir).read().foreach(entry => if 
(!hwms.contains(entry._1)) hwms += entry)
        }
        highWatermarkCheckpoints(dir).write(hwms)
      } catch {
        case e: IOException =>
          fatal("Error writing to highwatermark file: ", e)
          Runtime.getRuntime().halt(1)
      }
    }
  }
{code}


> Improving controlled shutdown for rolling updates
> -------------------------------------------------
>
>                 Key: KAFKA-2029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2029
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller
>    Affects Versions: 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Neha Narkhede
>            Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting for free 
> space in controller-to-broker queue). After broker starts back it receives 
> followers request and erases highwatermarks (with a message that "replica 
> does not exists" - controller hadn't yet sent a request with replica 
> assignment), then controller starts replicas on the broker it deletes all 
> local data (due to missing highwatermarks). Furthermore, controller starts 
> processing pending shutdown request and stops replicas on the broker letting 
> it in a non-functional state. Solution to the problem might be to increase 
> time broker waits for controller reponse to shutdown request, but this 
> timeout is taken from controller.socket.timeout.ms which is global for all 
> broker-controller communication and setting it to 30 minutes is dangerous. 
> *Proposed solution: introduce dedicated config parameter for this timeout 
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back 
> controller got stuck in a deadlock (one thread owns the lock and tries to add 
> message to the dead broker's queue, send thread is a infinite loop trying to 
> retry message to the dead broker, and the broker failure handler is waiting 
> for a lock). There are numerous partitions without a leader and the only way 
> out is to kill -9 the controller. *Proposed solution: add timeout for adding 
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
> (RequestOrResponse) => Unit = null) {
>     brokerLock synchronized {
>       val stateInfoOpt = brokerStateInfo.get(brokerId)
>       stateInfoOpt match {
>         case Some(stateInfo) =>
>           // ODKL Patch: prevent infinite hang on trying to send message to a 
> dead broker.
>           // TODO: Move timeout to config
>           if (!stateInfo.messageQueue.offer((request, callback), 10, 
> TimeUnit.SECONDS)) {
>             error("Timed out trying to send message to broker " + 
> brokerId.toString)
>             // Do not throw, as it brings controller into completely 
> non-functional state
>             // "Controller to broker state change requests batch is not empty 
> while creating a new one"
>             //throw new IllegalStateException("Timed out trying to send 
> message to broker " + brokerId.toString)
>           }
>         case None =>
>           warn("Not sending request %s to broker %d, since it is 
> offline.".format(request, brokerId))
>       }
>     }
>   }
> {code}
> # When broker which is a controler starts shut down if auto leader rebalance 
> is running it deadlocks in the end (shutdown thread owns the lock and waits 
> for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
> solution: use bounded wait in rebalance thread*. KafkaController.scala:
> {code}
>   // ODKL Patch to prevent deadlocks in shutdown.
>   /**
>    * Execute the given function inside the lock
>    */
>   def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
>     if (isRunning || lock.isHeldByCurrentThread) {
>       // TODO: Configure timeout.
>       if (!lock.tryLock(10, TimeUnit.SECONDS)) {
>         throw new IllegalStateException("Failed to acquire controller lock in 
> 10 seconds.");
>       }
>       try {
>         return fun
>       } finally {
>         lock.unlock()
>       }
>     } else {
>       throw new IllegalStateException("Controller is not running, not allowed 
> to lock.")
>     }
>   }
>   private def checkAndTriggerPartitionRebalance(): Unit = {
>     // Use inLockIfRunning here instead of inLock
>   }
> {code}
> # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector 
> act in a way that they prefer the oldes replica in ISR (the one that joined 
> the ISR first). In case of rolling update it means moving partitions to the 
> tail which increases the overal amount of movements and finally significantly 
> overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of 
> leadership). In case of multiple failures this logic can cuase a significant 
> disbalance in the leadership. *Proposed solution: Move leadership to preferd 
> replica if possible or to the younges replica (in controlled shutdown) or 
> second prefered replica (in offline partition)*:
> {code}
> class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
> extends PartitionLeaderSelector with Logging {
> ...
>             // ODKL Patch: Trying to select replica from ISR not depending on 
> ISR join order, but following the
>             // assignment order. Preferred replica is the first one, thus if 
> possible it'll be chosen, but most
>             // probably it is the dead one, thus we fallback to second 
> preferred replica. Here we do not care about
>             // overloading second preferred replica as we do not expect 
> rolling crashed.
>             val newLeader = liveBrokersInIsr.sortBy(x => 
> assignedReplicas.indexOf(x)).head
> ...
> }
> class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
>         extends PartitionLeaderSelector
>         with Logging {
> ...
>     // ODKL Patch: Trying to select replica from ISR not depending on ISR 
> join order. If preferred replica is in ISR, choose
>     // it, choose the last replica from ISR - it is expected to be the 
> youngest (most probably already survived rolling
>     // update)
>     val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
> assignedReplicas.headOption else newIsr.lastOption
> ...
> }
> {code}
> # Auto leader rebalance started simultaneously with controlled shutdown 
> compete with it for space in queue and can slow down the process. If the 
> queue size is large it could also create a significant data loss (for few 
> minutes there might be multiple brokers considering itself as a leader and 
> accepting produce requests). *Proposed solution: add throttling to the auto 
> rebalance*:
> {code}
> private def checkAndTriggerPartitionRebalance(): Unit = {
> ...
>           if (imbalanceRatio > 
> (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
>             info("Balancing broker " + leaderBroker + " with imbalance rate " 
> + imbalanceRatio)
>             topicsNotInPreferredReplica.foreach {
>               case (topicPartition, replicas) => {
>                 inLockIfRunning(controllerContext.controllerLock) {
>                   // do this check only if the broker is live and there are 
> no partitions being reassigned currently
>                   // and preferred replica election is not in progress
>                   if (controllerContext.liveBrokerIds.contains(leaderBroker) 
> &&
>                     controllerContext.partitionsBeingReassigned.size == 0 &&
>                     
> controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
>                     
> !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
>                     
> !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
>                     
> controllerContext.allTopics.contains(topicPartition.topic)) {
>                     onPreferredReplicaElection(Set(topicPartition), true)
>                   }
>                 }
>                 // ODKL patch: prevent too fast prefered replica elections.
>                 // TODO: Make configurable/use true throttling
>                 Utils.swallow(Thread.sleep(2000))
>               }
>             }
>             info("Balancing broker " + leaderBroker + " done")
>           }
> ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to