KAFKA-828 Preferred Replica Election does not delete the admin path on controller failover; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fe3f9fe Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fe3f9fe Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fe3f9fe Branch: refs/heads/trunk Commit: 2fe3f9fef6e641ed64aca654b607c4d398e11d25 Parents: f570cce Author: Swapnil Ghike <[email protected]> Authored: Thu Mar 28 08:50:38 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Thu Mar 28 08:51:00 2013 -0700 ---------------------------------------------------------------------- .../PreferredReplicaLeaderElectionCommand.scala | 7 ++--- .../kafka/controller/KafkaController.scala | 31 ++++++++++---------- .../controller/PartitionLeaderSelector.scala | 8 ++--- .../controller/PartitionStateMachine.scala | 3 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 17 +---------- .../test/scala/unit/kafka/admin/AdminTest.scala | 2 +- 6 files changed, 27 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 7405c5a..d5de5f3 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { if (!options.has(jsonFileOpt)) ZkUtils.getAllPartitions(zkClient) else - parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt))) + parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt))) val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection) preferredReplicaElectionCommand.moveLeaderToPreferredReplica() @@ -69,7 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { } } - def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = { + def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = { Json.parseFull(jsonString) match { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { @@ -101,8 +101,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { info("Created preferred replica election path with %s".format(jsonData)) } catch { case nee: ZkNodeExistsException => - val partitionsUndergoingPreferredReplicaElection = - PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1) + val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) throw new AdministrationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) case e2 => throw new AdministrationException(e2.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 229239c..9d32901 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -386,8 +386,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) - controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions - partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) + try { + controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions + partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) + } catch { + case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) + } finally { + removePartitionsFromPreferredReplicaElection(partitions) + } } /** @@ -910,20 +916,15 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { - debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" + - " %s".format(dataPath, data.toString)) - val partitionsForPreferredReplicaElection = - PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString) - val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection + debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s" + .format(dataPath, data.toString)) + val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString) + controllerContext.controllerLock synchronized { - try { - controller.onPreferredReplicaElection(newPartitions) - } catch { - case e => error("Error completing preferred replica leader election for partitions %s" - .format(partitionsForPreferredReplicaElection.mkString(",")), e) - } finally { - controller.removePartitionsFromPreferredReplicaElection(newPartitions) - } + info("These partitions are already undergoing preferred replica election: %s" + .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(","))) + val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection + controller.onPreferredReplicaElection(newPartitions) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d295781..7a06c24 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -18,7 +18,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.utils.Logging -import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} +import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} trait PartitionLeaderSelector { @@ -125,9 +125,9 @@ with Logging { val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader - if(currentLeader == preferredReplica) { - throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s" - .format(preferredReplica, topicAndPartition)) + if (currentLeader == preferredReplica) { + throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" + .format(preferredReplica, topicAndPartition)) } else { info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election") http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 654fa2e..da47ac8 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -20,7 +20,7 @@ import collection._ import collection.JavaConversions._ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr -import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} +import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -315,6 +315,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) } catch { + case lenne: LeaderElectionNotNeededException => // swallow case nroe: NoReplicaOnlineException => throw nroe case sce => val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 9a0e250..ce1904b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -633,22 +633,7 @@ object ZkUtils extends Logging { // read the partitions and their new replica list val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1 jsonPartitionListOpt match { - case Some(jsonPartitionList) => parsePreferredReplicaElectionData(jsonPartitionList) - case None => Set.empty[TopicAndPartition] - } - } - - def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = { - Json.parseFull(jsonData) match { - case Some(m) => - val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]] - val partitions = topicAndPartitions.map { p => - val topicPartitionMap = p - val topic = topicPartitionMap.get("topic").get - val partition = topicPartitionMap.get("partition").get.toInt - TopicAndPartition(topic, partition) - } - Set.empty[TopicAndPartition] ++ partitions + case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList) case None => Set.empty[TopicAndPartition] } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 6c80c4c..b0a0e09 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -332,7 +332,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val preferredReplicaElectionZkData = ZkUtils.readData(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)._1 val partitionsUndergoingPreferredReplicaElection = - PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData) + PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData) assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection, partitionsUndergoingPreferredReplicaElection) }
