Updated Branches: refs/heads/0.8 e4f287db6 -> bbb161aa2
KAFKA-923 Improve controller failover latency. Remove unnecessary zookeeper reads; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb161aa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb161aa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb161aa Branch: refs/heads/0.8 Commit: bbb161aa250ce3071b03e01d9fa82091f23e1b30 Parents: e4f287d Author: Neha Narkhede <[email protected]> Authored: Wed May 29 09:52:23 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed May 29 10:05:48 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/controller/KafkaController.scala | 2 +- .../kafka/controller/PartitionStateMachine.scala | 8 +++----- .../kafka/controller/ReplicaStateMachine.scala | 12 +++++++++++- core/src/main/scala/kafka/utils/ZkUtils.scala | 14 ++++++-------- 4 files changed, 21 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f334685..a4e96cc 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -531,7 +531,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def updateLeaderAndIsrCache() { - val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq) + val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet) for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e3af0c3..0f5ebde 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -189,13 +189,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { */ private def initializePartitionState() { for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) { - val topic = topicPartition.topic - val partition = topicPartition.partition // check if leader and isr path exists for partition. If not, then it is in NEW state - ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { - case Some(currentLeaderAndIsr) => + controllerContext.partitionLeadershipInfo.get(topicPartition) match { + case Some(currentLeaderIsrAndEpoch) => // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state - controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match { + controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match { case true => // leader is alive partitionState.put(topicPartition, OnlinePartition) case false => http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index e237805..5283fcd 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -59,7 +59,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { initializeReplicaState() hasStarted.set(true) // move all Online replicas to Online - handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, + handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq, controllerContext.liveBrokerIds.toSeq), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) } @@ -229,6 +229,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } + private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = { + brokerIds.map { brokerId => + val partitionsAssignedToThisBroker = + controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId)) + if(partitionsAssignedToThisBroker.size == 0) + info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) + partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId)) + }.flatten.toSet + } + def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = { controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 3775eb4..2f5dff6 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -461,15 +461,13 @@ object ZkUtils extends Logging { cluster } - def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { + def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition]) + : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] - val partitionsForTopics = getPartitionsForTopics(zkClient, topics) - for((topic, partitions) <- partitionsForTopics) { - for(partition <- partitions) { - ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match { - case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch) - case None => - } + for(topicAndPartition <- topicAndPartitions) { + ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match { + case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch) + case None => } } ret
