use zk for auto rebalance
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/425af9b4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/425af9b4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/425af9b4 Branch: refs/heads/trunk Commit: 425af9b4a7afeb21191c33ba6bc3f20623a3f0b3 Parents: 55d77c6 Author: Sriram Subramanian <[email protected]> Authored: Fri Dec 20 11:11:20 2013 -0800 Committer: Sriram Subramanian <[email protected]> Committed: Fri Dec 20 11:11:20 2013 -0800 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 48 ++++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/425af9b4/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 522e6c7..74e2ea4 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -21,7 +21,7 @@ import collection.immutable.Set import com.yammer.metrics.core.Gauge import java.lang.{IllegalStateException, Object} import java.util.concurrent.TimeUnit -import kafka.admin.PreferredReplicaLeaderElectionCommand +import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand} import kafka.api._ import kafka.cluster.Broker import kafka.common._ @@ -945,29 +945,39 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null controllerContext.controllerLock synchronized { - if (controllerContext.liveBrokerIds.contains(leaderBroker) && - controllerContext.partitionsBeingReassigned.size == 0) { - // do this check only if the broker is live and there are no partitions being reassigned currently - topicsNotInPreferredReplica = - topicAndPartitionsForBroker.filter { - case(topicPartition, replicas) => { - controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker - } + topicsNotInPreferredReplica = + topicAndPartitionsForBroker.filter { + case(topicPartition, replicas) => { + controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker } - debug("topics not in preferred replica " + topicsNotInPreferredReplica) - val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size - val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size - imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker - trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) - } + } + debug("topics not in preferred replica " + topicsNotInPreferredReplica) + val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size + val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size + imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker + trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) } // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - topicsNotInPreferredReplica.foreach { - case(topicPartition, replicas) => { - controllerContext.controllerLock synchronized { - onPreferredReplicaElection(Set(topicPartition), false) + controllerContext.controllerLock synchronized { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.size == 0 && + controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) { + val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath + val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) + val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) + try { + ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) + info("Created preferred replica election path with %s".format(jsonData)) + } catch { + case e2: Throwable => + val partitionsUndergoingPreferredReplicaElection = + PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) + error("Preferred replica leader election currently in progress for " + + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)); } } }
