Updated Branches: refs/heads/trunk d401292ab -> 855340a2e
kafka-1202; optimize ZK access in KafkaController; also incorporating fixes in kafka-1020; patched by Jun Rao and Guozhang Wang; reviewed by Neha Narkhede and Joel Koshy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/855340a2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/855340a2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/855340a2 Branch: refs/heads/trunk Commit: 855340a2e65ffbb79520c49d0b9a231b94acd538 Parents: d401292 Author: Jun Rao <[email protected]> Authored: Wed Jan 15 08:37:14 2014 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Jan 15 08:37:14 2014 -0800 ---------------------------------------------------------------------- .../controller/ControllerChannelManager.scala | 4 - .../kafka/controller/KafkaController.scala | 132 ++++++++++++------- .../controller/PartitionLeaderSelector.scala | 40 +++--- .../controller/PartitionStateMachine.scala | 22 +++- .../kafka/controller/ReplicaStateMachine.scala | 35 ++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 43 ------ 6 files changed, 146 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 33a84fb..ea8485b 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -203,10 +203,6 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq if(stopAndDeleteReplicaRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString())) - leaderAndIsrRequestMap.clear() - stopReplicaRequestMap.clear() - updateMetadataRequestMap.clear() - stopAndDeleteReplicaRequestMap.clear() } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 03ef9cf..a0267ae 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -69,6 +69,26 @@ class ControllerContext(val zkClient: ZkClient, def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying def liveOrShuttingDownBrokers = liveBrokersUnderlying + + def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = { + partitionReplicaAssignment + .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) } + .map { case(topicAndPartition, replicas) => topicAndPartition } + .toSet + } + + def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { + brokerIds.map { brokerId => + partitionReplicaAssignment + .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) } + .map { case(topicAndPartition, replicas) => + new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) } + }.flatten.toSet + } + + def allLiveReplicas(): Set[PartitionAndReplica] = { + replicasOnBrokers(liveBrokerIds) + } } trait KafkaControllerMBean { @@ -190,13 +210,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(",")) } - val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized { - getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map { - case(topic, partition) => - val topicAndPartition = TopicAndPartition(topic, partition) - (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size) + val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] = + controllerContext.controllerLock synchronized { + controllerContext.partitionsOnBroker(id) + .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)) } - } allPartitionsAndReplicationFactorOnBroker.foreach { case(topicAndPartition, replicationFactor) => @@ -328,7 +346,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg sendUpdateMetadataRequest(newBrokers) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica) + replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica) // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() @@ -366,12 +384,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // handle dead replicas - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica) + replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica) } /** - * This callback is invoked by the partition state machine's topic change listener with the list of failed brokers - * as input. It does the following - + * This callback is invoked by the partition state machine's topic change listener with the list of new topics + * and partitions as input. It does the following - * 1. Registers partition change listener. This is not required until KAFKA-347 * 2. Invokes the new partition callback */ @@ -383,7 +401,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } /** - * This callback is invoked by the topic change callback with the list of failed brokers as input. + * This callback is invoked by the partition state machine's partition change listener with the list of new partitions. * It does the following - * 1. Move the newly created partitions to the NewPartition state * 2. Move the newly created partitions from NewPartition->OnlinePartition state @@ -399,60 +417,84 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /** * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener. - * Reassigning replicas for a partition goes through a few stages - + * Reassigning replicas for a partition goes through a few steps listed in the code. * RAR = Reassigned replicas - * AR = Original list of replicas for partition - * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send a LeaderAndIsr request with - * AR = AR + RAR to all replicas in (AR + RAR) - * 2. Start new replicas RAR - AR. - * 3. Wait until new replicas are in sync with the leader - * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr + * OAR = Original list of replicas for partition + * AR = current assigned replicas + * + * 1. Update AR in ZK with OAR + RAR. + * 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update + * of the leader epoch in zookeeper. + * 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state. + * 4. Wait until all replicas in RAR are in sync with the leader. + * 5 Move all replicas in RAR to OnlineReplica state. + * 6. Set AR to RAR in memory. + * 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr * will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent. * In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in - * RAR - AR back in the ISR - * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part - * of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to - * the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in - * RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR. - * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR. - * 7. Remove partition from the /admin/reassign_partitions path + * RAR - OAR back in the isr. + * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the + * isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. + * After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR. + * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to + * the replicas in OAR - RAR to physically delete the replicas on disk. + * 10. Update AR in ZK with RAR. + * 11. Update the /admin/reassign_partitions path in ZK to remove this partition. + * 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. + * + * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK + * may go through the following transition. + * AR leader/isr + * {1,2,3} 1/{1,2,3} (initial state) + * {1,2,3,4,5,6} 1/{1,2,3} (step 2) + * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) + * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) + * {1,2,3,4,5,6} 4/{4,5,6} (step 8) + * {4,5,6} 4/{4,5,6} (step 10) + * + * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently. + * This way, if the controller crashes before that step, we can still recover. */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { + case false => + info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + + "reassigned not yet caught up with the leader") + val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet + val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet + //1. Update AR in ZK with OAR + RAR. + updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) + //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). + updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), + newAndOldReplicas.toSeq) + //3. replicas in RAR - OAR -> NewReplica + startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) + info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + + "reassigned to catch up with the leader") case true => + //4. Wait until all replicas in RAR are in sync with the leader. val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet - // mark the new replicas as online + //5. replicas in RAR -> OnlineReplica reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica) } - // check if current leader is in the new replicas list. If not, controller needs to trigger leader election + //6. Set AR to RAR in memory. + //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and + // a new AR (using RAR) and same isr to every broker in RAR moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) - // stop older replicas + //8. replicas in OAR - RAR -> Offline (force those replicas out of isr) + //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) - // write the new list of replicas for this partition in zookeeper + //10. Update AR in ZK with RAR. updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) - // update the /admin/reassign_partitions path to remove this partition + //11. Update the /admin/reassign_partitions path in ZK to remove this partition. removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) - // after electing leader, the replicas and isr information changes, so resend the update metadata request + //12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) - case false => - info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + - "reassigned not yet caught up with the leader") - val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet - val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet - // write the expanded list of replicas to zookeeper - updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) - // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), - newAndOldReplicas.toSeq) - // start new replicas - startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) - info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + - "reassigned to catch up with the leader") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index a47b142..fd9200f 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -26,19 +26,20 @@ trait PartitionLeaderSelector { * @param topicAndPartition The topic and partition whose leader needs to be elected * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive - * @return The leader and isr request, with the newly selected leader info, to send to the brokers - * Also, returns the list of replicas the returned leader and isr request should be sent to - * This API selects a new leader for the input partition + * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive + * the LeaderAndIsrRequest. */ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) } /** - * This API selects a new leader for the input partition - - * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader - * 2. Else, it picks some alive broker from the assigned replica list as the new leader + * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): + * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live + * isr as the new isr. + * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException + * Replicas to receive LeaderAndIsr request = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { @@ -82,7 +83,9 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten } /** - * Picks one of the alive in-sync reassigned replicas as the new leader. + * New leader = a live in-sync reassigned replica + * New isr = current isr + * Replicas to receive LeaderAndIsr request = reassigned replicas */ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " @@ -94,7 +97,8 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && + currentLeaderAndIsr.isr.contains(r)) val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match { case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, @@ -106,16 +110,16 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new StateChangeFailedException("None of the reassigned replicas for partition " + - "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) + "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } } /** - * Picks the preferred replica as the new leader if - - * 1. It is already not the current leader - * 2. It is alive + * New leader = preferred (first assigned) replica (if in isr and alive); + * New isr = current isr; + * Replicas to receive LeaderAndIsr request = assigned replicas */ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { @@ -145,8 +149,9 @@ with Logging { } /** - * Picks one of the alive replicas (other than the current leader) in ISR as - * new leader, fails if there are no other replicas in ISR. + * New leader = replica in isr that's not being shutdown; + * New isr = current isr - shutdown replica; + * Replicas to receive LeaderAndIsr request = live assigned replicas */ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector @@ -164,8 +169,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) - val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader && - !controllerContext.shuttingDownBrokerIds.contains(brokerId)) + val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) val newLeaderOpt = newIsr.headOption newLeaderOpt match { case Some(newLeader) => @@ -174,8 +178,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => - throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" + - " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) + throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5859ce7..ac4262a 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -119,7 +119,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { /** * This API exercises the partition's state machine. It ensures that every state transition happens from a legal - * previous state to the target state. + * previous state to the target state. Valid state transitions are: + * NonExistentPartition -> NewPartition: + * --load assigned replicas from ZK to controller cache + * + * NewPartition -> OnlinePartition + * --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition + * --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker + * + * OnlinePartition,OfflinePartition -> OnlinePartition + * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK + * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker + * + * NewPartition,OnlinePartition -> OfflinePartition + * --nothing other than marking partition state as Offline + * + * OfflinePartition -> NonExistentPartition + * --nothing other than marking the partition state as NonExistentPartition * @param topic The topic of the partition for which the state transition is invoked * @param partition The partition for which the state transition is invoked * @param targetState The end state that the partition should be moved to @@ -273,8 +289,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } /** - * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader - * for the input offline partition + * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change. + * It invokes the leader election API to elect a leader for the input offline partition * @param topic The topic of the offline partition * @param partition The offline partition * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.) http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ad4ee53..483559a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -58,8 +58,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { initializeReplicaState() hasStarted.set(true) // move all Online replicas to Online - handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq, - controllerContext.liveBrokerIds.toSeq), OnlineReplica) + handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) } @@ -95,7 +94,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { /** * This API exercises the replica's state machine. It ensures that every state transition happens from a legal - * previous state to the target state. + * previous state to the target state. Valid state transitions are: + * NonExistentReplica --> NewReplica + * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker + * + * NewReplica -> OnlineReplica + * --add the new replica to the assigned replica list if needed + * + * OnlineReplica,OfflineReplica -> OnlineReplica + * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker + * + * NewReplica,OnlineReplica -> OfflineReplica + * --send StopReplicaRequest to the replica (w/o deletion) + * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker. + * + * OfflineReplica -> NonExistentReplica + * --send StopReplicaRequest to the replica (with deletion) + * * @param topic The topic of the replica for which the state transition is invoked * @param partition The partition of the replica for which the state transition is invoked * @param replicaId The replica for which the state transition is invoked @@ -228,20 +243,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } - private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = { - brokerIds.map { brokerId => - val partitionsAssignedToThisBroker = - controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId)) - if(partitionsAssignedToThisBroker.size == 0) - info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) - partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId)) - }.flatten.toSet - } - - def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = { - controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq - } - /** * This is the zookeeper listener that triggers all the state transitions for a replica */ http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 73902b2..b42e52b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -185,12 +185,6 @@ object ZkUtils extends Logging { } } - def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { - val replicas = getReplicasForPartition(zkClient, topic, partition) - debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas)) - replicas.contains(brokerId.toString) - } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString @@ -500,8 +494,6 @@ object ZkUtils extends Logging { client.exists(path) } - def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1) - def getCluster(zkClient: ZkClient) : Cluster = { val cluster = new Cluster val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath) @@ -571,17 +563,6 @@ object ZkUtils extends Logging { ret } - def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): - mutable.Map[(String, Int), Seq[Int]] = { - val ret = new mutable.HashMap[(String, Int), Seq[Int]] - for((topic, partitionAssignment) <- topicPartitionAssignment){ - for((partition, replicaAssignment) <- partitionAssignment){ - ret.put((topic, partition), replicaAssignment) - } - } - ret - } - def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = { getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap => val topic = topicAndPartitionMap._1 @@ -591,19 +572,6 @@ object ZkUtils extends Logging { } } - def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Seq[(String, Int)] = { - val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics) - topicsAndPartitions.map { topicAndPartitionMap => - val topic = topicAndPartitionMap._1 - val partitionMap = topicAndPartitionMap._2 - val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) ) - val relevantPartitions = relevantPartitionsMap.map(_._1) - for(relevantPartition <- relevantPartitions) yield { - (topic, relevantPartition) - } - }.flatten[(String, Int)].toSeq - } - def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext] = { // read the partitions and their new replica list val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1 @@ -677,17 +645,6 @@ object ZkUtils extends Logging { } } - def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = { - Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId => - // read all the partitions and their assigned replicas into a map organized by - // { replica id -> partition 1, partition 2... - val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId) - if(partitionsAssignedToThisBroker.size == 0) - info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) - partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId)) - }.flatten - } - def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = { // read the partitions and their new replica list val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
