use zk for auto rebalance

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/425af9b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/425af9b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/425af9b4

Branch: refs/heads/trunk
Commit: 425af9b4a7afeb21191c33ba6bc3f20623a3f0b3
Parents: 55d77c6
Author: Sriram Subramanian <[email protected]>
Authored: Fri Dec 20 11:11:20 2013 -0800
Committer: Sriram Subramanian <[email protected]>
Committed: Fri Dec 20 11:11:20 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 48 ++++++++++++--------
 1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/425af9b4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 522e6c7..74e2ea4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import collection.immutable.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{AdminOperationException, 
PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
@@ -945,29 +945,39 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
           var imbalanceRatio: Double = 0
           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = 
null
           controllerContext.controllerLock synchronized {
-            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                controllerContext.partitionsBeingReassigned.size == 0) {
-              // do this check only if the broker is live and there are no 
partitions being reassigned currently
-              topicsNotInPreferredReplica =
-                topicAndPartitionsForBroker.filter {
-                  case(topicPartition, replicas) => {
-                    
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= leaderBroker
-                  }
+            topicsNotInPreferredReplica =
+              topicAndPartitionsForBroker.filter {
+                case(topicPartition, replicas) => {
+                  
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= leaderBroker
                 }
-              debug("topics not in preferred replica " + 
topicsNotInPreferredReplica)
-              val totalTopicPartitionsForBroker = 
topicAndPartitionsForBroker.size
-              val totalTopicPartitionsNotLedByBroker = 
topicsNotInPreferredReplica.size
-              imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / 
totalTopicPartitionsForBroker
-              trace("leader imbalance ratio for broker %d is 
%f".format(leaderBroker, imbalanceRatio))
-            }
+              }
+            debug("topics not in preferred replica " + 
topicsNotInPreferredReplica)
+            val totalTopicPartitionsForBroker = 
topicAndPartitionsForBroker.size
+            val totalTopicPartitionsNotLedByBroker = 
topicsNotInPreferredReplica.size
+            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / 
totalTopicPartitionsForBroker
+            trace("leader imbalance ratio for broker %d is 
%f".format(leaderBroker, imbalanceRatio))
           }
           // check ratio and if greater than desired ratio, trigger a 
rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            topicsNotInPreferredReplica.foreach {
-              case(topicPartition, replicas) => {
-                controllerContext.controllerLock synchronized {
-                  onPreferredReplicaElection(Set(topicPartition), false)
+            controllerContext.controllerLock synchronized {
+              // do this check only if the broker is live and there are no 
partitions being reassigned currently
+              // and preferred replica election is not in progress
+              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                  controllerContext.partitionsBeingReassigned.size == 0 &&
+                  
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+                val partitionsList = topicsNotInPreferredReplica.keys.map(e => 
Map("topic" -> e.topic, "partition" -> e.partition))
+                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> 
partitionsList))
+                try {
+                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+                  info("Created preferred replica election path with 
%s".format(jsonData))
+                } catch {
+                  case e2: Throwable =>
+                    val partitionsUndergoingPreferredReplicaElection =
+                      
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
 zkPath)._1)
+                    error("Preferred replica leader election currently in 
progress for " +
+                          "%s. Aborting 
operation".format(partitionsUndergoingPreferredReplicaElection));
                 }
               }
             }

Reply via email to