MINOR: Rename and change package of async ZooKeeper classes - kafka.controller.ZookeeperClient -> kafka.zookeeper.ZooKeeperClient - kafka.controller.ControllerZkUtils -> kafka.zk.KafkaZkClient - kafka.controller.ZkData -> kafka.zk.ZkData - Renamed various fields to match new names and for consistency - A few clean-ups in ZkData - Document intent
Author: Ismael Juma <[email protected]> Reviewers: Onur Karaman <[email protected]>, Manikumar Reddy <[email protected]>, Jun Rao <[email protected]> Closes #4112 from ijuma/rename-zookeeper-client-and-move-to-zookeper-package Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab6f848b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab6f848b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab6f848b Branch: refs/heads/trunk Commit: ab6f848ba6cafaed3d75b54005c954733f0d1735 Parents: f7f8e11 Author: Ismael Juma <[email protected]> Authored: Wed Oct 25 21:11:16 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Oct 25 21:11:16 2017 -0700 ---------------------------------------------------------------------- .../ZkNodeChangeNotificationListener.scala | 4 +- .../kafka/controller/KafkaController.scala | 144 ++-- .../controller/KafkaControllerZkUtils.scala | 716 ------------------ .../controller/PartitionStateMachine.scala | 13 +- .../kafka/controller/ReplicaStateMachine.scala | 9 +- .../kafka/controller/TopicDeletionManager.scala | 11 +- .../main/scala/kafka/controller/ZkData.scala | 248 ------- .../kafka/controller/ZookeeperClient.scala | 374 ---------- core/src/main/scala/kafka/log/LogManager.scala | 9 +- .../main/scala/kafka/server/KafkaServer.scala | 18 +- core/src/main/scala/kafka/utils/Json.scala | 19 + core/src/main/scala/kafka/utils/ZkUtils.scala | 3 + .../src/main/scala/kafka/zk/KafkaZkClient.scala | 726 +++++++++++++++++++ core/src/main/scala/kafka/zk/ZkData.scala | 244 +++++++ .../scala/kafka/zookeeper/ZooKeeperClient.scala | 374 ++++++++++ .../api/SaslPlainPlaintextConsumerTest.scala | 2 +- .../unit/kafka/admin/DeleteTopicTest.scala | 2 +- .../ZkNodeChangeNotificationListenerTest.scala | 2 +- .../unit/kafka/consumer/TopicFilterTest.scala | 2 +- .../controller/PartitionStateMachineTest.scala | 70 +- .../controller/ReplicaStateMachineTest.scala | 34 +- .../kafka/controller/ZookeeperClientTest.scala | 339 --------- .../kafka/integration/AutoOffsetResetTest.scala | 2 +- .../security/auth/SimpleAclAuthorizerTest.scala | 4 +- .../kafka/server/ClientQuotaManagerTest.scala | 6 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 3 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 10 +- .../unit/kafka/server/LogRecoveryTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 8 +- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 +- .../test/scala/unit/kafka/zk/ZKPathTest.scala | 8 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +- .../kafka/zookeeper/ZooKeeperClientTest.scala | 339 +++++++++ .../integration/utils/EmbeddedKafkaCluster.java | 6 +- 34 files changed, 1902 insertions(+), 1865 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index b4ee1fd..0e34c5a 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -153,11 +153,11 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, } override def handleSessionEstablishmentError(error: Throwable) { - fatal("Could not establish session with zookeeper", error) + fatal("Could not establish session with ZooKeeper", error) } override def handleStateChanged(state: KeeperState) { - debug(s"New zookeeper state: ${state}") + debug(s"New ZooKeeper state: ${state}") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 1df40b3..d3e6998 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,10 +22,12 @@ import com.yammer.metrics.core.Gauge import kafka.admin.AdminOperationException import kafka.api._ import kafka.common._ -import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ import kafka.utils._ +import kafka.zk._ +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -41,7 +43,7 @@ object KafkaController extends Logging { val InitialControllerEpochZkVersion = 1 } -class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = s"[Controller id=${config.brokerId}] " private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) @@ -55,10 +57,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics()) - val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkUtils) + val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger) - val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) - val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) + val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) + val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger)) private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager) private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager) @@ -155,23 +157,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, * This ensures another controller election will be triggered and there will always be an actively serving controller */ def onControllerFailover() { - info("Reading controller epoch from zookeeper") - readControllerEpochFromZookeeper() - info("Incrementing controller epoch in zookeeper") + info("Reading controller epoch from ZooKeeper") + readControllerEpochFromZooKeeper() + info("Incrementing controller epoch in ZooKeeper") incrementControllerEpoch() info("Registering handlers") // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler) - childChangeHandlers.foreach(zkUtils.registerZNodeChildChangeHandler) + childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) - nodeChangeHandlers.foreach(zkUtils.registerZNodeChangeHandlerAndCheckExistence) + nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) info("Deleting log dir event notifications") - zkUtils.deleteLogDirEventNotifications() + zkClient.deleteLogDirEventNotifications() info("Deleting isr change notifications") - zkUtils.deleteIsrChangeNotifications() + zkClient.deleteIsrChangeNotifications() info("Initializing controller context") initializeControllerContext() info("Fetching topic deletions in progress") @@ -213,10 +215,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, def onControllerResignation() { debug("Resigning") // de-register listeners - zkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path) - zkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path) - zkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path) - zkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path) + zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path) + zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path) + zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path) + zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path) // reset topic deletion manager topicDeletionManager.reset() @@ -232,12 +234,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, unregisterPartitionReassignmentIsrChangeHandlers() // shutdown partition state machine partitionStateMachine.shutdown() - zkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path) + zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path) unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq) - zkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path) + zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path) // shutdown replica state machine replicaStateMachine.shutdown() - zkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path) + zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path) resetControllerContext() @@ -465,7 +467,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition) reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler // register listener on the leader and isr path to wait until they catch up with the current leader - zkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler) + zkClient.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler) } def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, @@ -536,7 +538,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, def incrementControllerEpoch(): Unit = { val newControllerEpoch = controllerContext.epoch + 1 - val setDataResponse = zkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion) + val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion) setDataResponse.resultCode match { case Code.OK => controllerContext.epochZkVersion = setDataResponse.stat.getVersion @@ -545,7 +547,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, // if path doesn't exist, this is the first controller whose epoch should be 1 // the following call can still fail if another controller gets elected between checking if the path exists and // trying to create the controller epoch path - val createResponse = zkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch) + val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch) createResponse.resultCode match { case Code.OK => controllerContext.epoch = KafkaController.InitialControllerEpoch @@ -565,10 +567,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, private def initializeControllerContext() { // update controller cache with delete topic information - controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet - controllerContext.allTopics = zkUtils.getAllTopicsInCluster.toSet + controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet + controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) - controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet) + controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // update the leader and isr cache for all existing partitions from Zookeeper @@ -582,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = { - val partitionsUndergoingPreferredReplicaElection = zkUtils.getPreferredReplicaElection + val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection // check if they are already completed or topic was deleted val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition => val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition) @@ -618,7 +620,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, private def initializePartitionReassignment() { // read the partitions being reassigned from zookeeper path /admin/reassign_partitions - val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas)) + val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas)) // check if they are already completed or topic was deleted val reassignedPartitions = partitionsBeingReassigned.filter { partition => val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1) @@ -637,7 +639,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = { - val topicsToBeDeleted = zkUtils.getTopicDeletions.toSet + val topicsToBeDeleted = zkClient.getTopicDeletions.toSet val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) => replicas.exists(r => !controllerContext.isReplicaOnline(r, partition)) }.keySet.map(_.topic) @@ -661,14 +663,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) { - val leaderIsrAndControllerEpochs = zkUtils.getTopicPartitionStates(partitions) + val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) } } private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = { - zkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => + zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains) } } @@ -720,7 +722,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, replicas: Seq[Int]) { val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic) partitionsAndReplicasForThisTopic.put(partition, replicas) - val setDataResponse = zkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap) + val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap) if (setDataResponse.resultCode == Code.OK) { info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(","))) // update the assigned replica list after a successful zookeeper write @@ -769,13 +771,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic) partitionModificationsHandlers.put(topic, partitionModificationsHandler) } - partitionModificationsHandlers.values.foreach(zkUtils.registerZNodeChangeHandler) + partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler) } def unregisterPartitionModificationsHandlers(topics: Seq[String]) = { topics.foreach { topic => partitionModificationsHandlers.remove(topic) - .foreach(handler => zkUtils.unregisterZNodeChangeHandler(handler.path)) + .foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path)) } } @@ -784,13 +786,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, case (topicAndPartition, reassignedPartitionsContext) => val partitionReassignmentIsrChangeHandler = reassignedPartitionsContext.partitionReassignmentIsrChangeHandler - zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path) + zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path) } } - private def readControllerEpochFromZookeeper() { + private def readControllerEpochFromZooKeeper() { // initialize the controller epoch and zk version by reading from zookeeper - val epochAndStatOpt = zkUtils.getControllerEpoch + val epochAndStatOpt = zkClient.getControllerEpoch epochAndStatOpt.foreach { case (epoch, stat) => controllerContext.epoch = epoch controllerContext.epochZkVersion = stat.getVersion @@ -803,21 +805,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, // stop watching the ISR changes for this partition val partitionReassignmentIsrChangeHandler = controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler - zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path) + zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path) } // read the current list of reassigned partitions from zookeeper - val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas)) + val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas)) // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas) if (reassignment.isEmpty) { info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path)) - zkUtils.deletePartitionReassignment() + zkClient.deletePartitionReassignment() } else { - val setDataResponse = zkUtils.setPartitionReassignmentRaw(reassignment) + val setDataResponse = zkClient.setPartitionReassignmentRaw(reassignment) if (setDataResponse.resultCode == Code.NONODE) { - val createDataResponse = zkUtils.createPartitionReassignment(reassignment) + val createDataResponse = zkClient.createPartitionReassignment(reassignment) createDataResponse.resultException.foreach(e => throw new AdminOperationException(e)) } else { setDataResponse.resultException.foreach(e => throw new AdminOperationException(e)) @@ -840,7 +842,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } } if (!isTriggeredByAutoRebalance) - zkUtils.deletePreferredReplicaElection() + zkClient.deletePreferredReplicaElection() } /** @@ -872,7 +874,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - zkWriteCompleteOrUnnecessary = zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match { + zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { case Some(leaderIsrAndControllerEpoch) => val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch @@ -885,7 +887,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion // update the new leadership decision in zookeeper or retry val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) = - zkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch) + zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch) if (successfulUpdates.contains(partition)) { val finalLeaderAndIsr = successfulUpdates(partition) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch)) @@ -1065,7 +1067,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, def state = ControllerState.ControllerChange override def process(): Unit = { - zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) + zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } @@ -1111,7 +1113,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, private def triggerControllerMove(): Unit = { onControllerResignation() activeControllerId = -1 - zkUtils.deleteController() + zkClient.deleteController() } def expire(): Unit = { @@ -1126,7 +1128,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, def elect(): Unit = { val timestamp = time.milliseconds - activeControllerId = zkUtils.getControllerId.getOrElse(-1) + activeControllerId = zkClient.getControllerId.getOrElse(-1) /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following @@ -1138,14 +1140,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } try { - zkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp)) + zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp)) info(config.brokerId + " successfully elected as the controller") activeControllerId = config.brokerId onControllerFailover() } catch { case _: NodeExistsException => // If someone else has written the path, then - activeControllerId = zkUtils.getControllerId.getOrElse(-1) + activeControllerId = zkClient.getControllerId.getOrElse(-1) if (activeControllerId != -1) debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId)) @@ -1163,7 +1165,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - val curBrokers = zkUtils.getAllBrokersInCluster.toSet + val curBrokers = zkClient.getAllBrokersInCluster.toSet val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds @@ -1189,13 +1191,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - val topics = zkUtils.getAllTopicsInCluster.toSet + val topics = zkClient.getAllTopicsInCluster.toSet val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics registerPartitionModificationsHandlers(newTopics.toSeq) - val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics) + val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment @@ -1211,13 +1213,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - val sequenceNumbers = zkUtils.getAllLogDirEventNotifications + val sequenceNumbers = zkClient.getAllLogDirEventNotifications try { - val brokerIds = zkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers) + val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers) onBrokerLogDirFailure(brokerIds) } finally { // delete processed children - zkUtils.deleteLogDirEventNotifications(sequenceNumbers) + zkClient.deleteLogDirEventNotifications(sequenceNumbers) } } } @@ -1227,7 +1229,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(immutable.Set(topic)) + val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) if(topicDeletionManager.isTopicQueuedUpForDeletion(topic)) @@ -1248,12 +1250,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - var topicsToBeDeleted = zkUtils.getTopicDeletions.toSet + var topicsToBeDeleted = zkClient.getTopicDeletions.toSet debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted") val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics if (nonExistentTopics.nonEmpty) { warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}") - zkUtils.deleteTopicDeletions(nonExistentTopics.toSeq) + zkClient.deleteTopicDeletions(nonExistentTopics.toSeq) } topicsToBeDeleted --= nonExistentTopics if (config.deleteTopicEnable) { @@ -1272,7 +1274,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, } else { // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics info(s"Removing $topicsToBeDeleted since delete topic is disabled") - zkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq) + zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq) } } } @@ -1282,8 +1284,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - zkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler) - val partitionReassignment = zkUtils.getPartitionReassignment + zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler) + val partitionReassignment = zkClient.getPartitionReassignment val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) partitionsToBeReassigned.foreach { partitionToBeReassigned => if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { @@ -1306,7 +1308,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, // check if this partition is still being reassigned or not controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext => val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet - zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match { + zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet @@ -1334,16 +1336,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - val sequenceNumbers = zkUtils.getAllIsrChangeNotifications + val sequenceNumbers = zkClient.getAllIsrChangeNotifications try { - val partitions = zkUtils.getPartitionsFromIsrChangeNotifications(sequenceNumbers) + val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers) if (partitions.nonEmpty) { updateLeaderAndIsrCache(partitions) processUpdateNotifications(partitions) } } finally { // delete the notifications - zkUtils.deleteIsrChangeNotifications(sequenceNumbers) + zkClient.deleteIsrChangeNotifications(sequenceNumbers) } } @@ -1359,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { if (!isActive) return - zkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler) - val partitions = zkUtils.getPreferredReplicaElection + zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler) + val partitions = zkClient.getPreferredReplicaElection val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) if (partitionsForTopicsToBeDeleted.nonEmpty) { error("Skipping preferred replica election for partitions %s since the respective topics are being deleted" @@ -1375,8 +1377,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { val wasActiveBeforeChange = isActive - zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) - activeControllerId = zkUtils.getControllerId.getOrElse(-1) + zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) + activeControllerId = zkClient.getControllerId.getOrElse(-1) if (wasActiveBeforeChange && !isActive) { onControllerResignation() } @@ -1388,8 +1390,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, override def process(): Unit = { val wasActiveBeforeChange = isActive - zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) - activeControllerId = zkUtils.getControllerId.getOrElse(-1) + zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) + activeControllerId = zkClient.getControllerId.getOrElse(-1) if (wasActiveBeforeChange && !isActive) { onControllerResignation() } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala deleted file mode 100644 index bdd8b57..0000000 --- a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala +++ /dev/null @@ -1,716 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.controller - -import java.util.Properties - -import kafka.api.LeaderAndIsr -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.log.LogConfig -import kafka.server.ConfigType -import kafka.utils.{Logging, ZkUtils} -import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.data.Stat -import org.apache.zookeeper.{CreateMode, KeeperException} - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging { - import KafkaControllerZkUtils._ - - /** - * Gets topic partition states for the given partitions. - * @param partitions the partitions for which we want ot get states. - * @return sequence of GetDataResponses whose contexts are the partitions they are associated with. - */ - def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = { - val getDataRequests = partitions.map { partition => - GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition)) - } - retryRequestsUntilConnected(getDataRequests) - } - - /** - * Sets topic partition states for the given partitions. - * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. - * @return sequence of SetDataResponse whose contexts are the partitions they are associated with. - */ - def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = { - val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => - val path = TopicPartitionStateZNode.path(partition) - val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) - SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition)) - } - retryRequestsUntilConnected(setDataRequests.toSeq) - } - - /** - * Creates topic partition state znodes for the given partitions. - * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. - * @return sequence of CreateResponse whose contexts are the partitions they are associated with. - */ - def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = { - createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq) - createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq) - val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => - val path = TopicPartitionStateZNode.path(partition) - val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) - CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition)) - } - retryRequestsUntilConnected(createRequests.toSeq) - } - - /** - * Sets the controller epoch conditioned on the given epochZkVersion. - * @param epoch the epoch to set - * @param epochZkVersion the expected version number of the epoch znode. - * @return SetDataResponse - */ - def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = { - val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion) - retryRequestUntilConnected(setDataRequest) - } - - /** - * Creates the controller epoch znode. - * @param epoch the epoch to set - * @return CreateResponse - */ - def createControllerEpochRaw(epoch: Int): CreateResponse = { - val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), - acls(ControllerEpochZNode.path), CreateMode.PERSISTENT) - retryRequestUntilConnected(createRequest) - } - - /** - * Try to update the partition states of multiple partitions in zookeeper. - * @param leaderAndIsrs The partition states to update. - * @param controllerEpoch The current controller epoch. - * @return UpdateLeaderAndIsrResult instance containing per partition results. - */ - def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = { - val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr] - val updatesToRetry = mutable.Buffer.empty[TopicAndPartition] - val failed = mutable.Map.empty[TopicAndPartition, Exception] - val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) } - val setDataResponses = try { - setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) - } catch { - case e: Exception => - leaderAndIsrs.keys.foreach(partition => failed.put(partition, e)) - return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) - } - setDataResponses.foreach { setDataResponse => - val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition] - if (setDataResponse.resultCode == Code.OK) { - val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion) - successfulUpdates.put(partition, updatedLeaderAndIsr) - } else if (setDataResponse.resultCode == Code.BADVERSION) { - updatesToRetry += partition - } else { - failed.put(partition, setDataResponse.resultException.get) - } - } - UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) - } - - /** - * Get log configs that merge local configs with topic-level configs in zookeeper. - * @param topics The topics to get log configs for. - * @param config The local configs. - * @return A tuple of two values: - * 1. The successfully gathered log configs - * 2. Exceptions corresponding to failed log config lookups. - */ - def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]): - (Map[String, LogConfig], Map[String, Exception]) = { - val logConfigs = mutable.Map.empty[String, LogConfig] - val failed = mutable.Map.empty[String, Exception] - val configResponses = try { - getTopicConfigs(topics) - } catch { - case e: Exception => - topics.foreach(topic => failed.put(topic, e)) - return (logConfigs.toMap, failed.toMap) - } - configResponses.foreach { configResponse => - val topic = configResponse.ctx.get.asInstanceOf[String] - if (configResponse.resultCode == Code.OK) { - val overrides = ConfigEntityZNode.decode(configResponse.data) - val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties)) - logConfigs.put(topic, logConfig) - } else if (configResponse.resultCode == Code.NONODE) { - val logConfig = LogConfig.fromProps(config, new Properties) - logConfigs.put(topic, logConfig) - } else { - failed.put(topic, configResponse.resultException.get) - } - } - (logConfigs.toMap, failed.toMap) - } - - /** - * Gets all brokers in the cluster. - * @return sequence of brokers in the cluster. - */ - def getAllBrokersInCluster: Seq[Broker] = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - val brokerIds = getChildrenResponse.children.map(_.toInt) - val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId))) - val getDataResponses = retryRequestsUntilConnected(getDataRequests) - getDataResponses.flatMap { getDataResponse => - val brokerId = getDataResponse.ctx.get.asInstanceOf[Int] - if (getDataResponse.resultCode == Code.OK) { - Option(BrokerIdZNode.decode(brokerId, getDataResponse.data)) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - } - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get - } - } - - /** - * Gets all topics in the cluster. - * @return sequence of topics in the cluster. - */ - def getAllTopicsInCluster: Seq[String] = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get - } - } - - /** - * Sets the topic znode with the given assignment. - * @param topic the topic whose assignment is being set. - * @param assignment the partition to replica mapping to set for the given topic - * @return SetDataResponse - */ - def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = { - val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1) - retryRequestUntilConnected(setDataRequest) - } - - /** - * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path. - * @return sequence of znode names and not the absolute znode path. - */ - def getAllLogDirEventNotifications: Seq[String] = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber) - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get - } - } - - /** - * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids. - * @param sequenceNumbers the sequence numbers associated with the log dir event notifications. - * @return broker ids associated with the given log dir event notifications. - */ - def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = { - val getDataRequests = sequenceNumbers.map { sequenceNumber => - GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber)) - } - val getDataResponses = retryRequestsUntilConnected(getDataRequests) - getDataResponses.flatMap { getDataResponse => - if (getDataResponse.resultCode == Code.OK) { - LogDirEventNotificationSequenceZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - } - } - - /** - * Deletes all log dir event notifications. - */ - def deleteLogDirEventNotifications(): Unit = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) - } else if (getChildrenResponse.resultCode != Code.NONODE) { - throw getChildrenResponse.resultException.get - } - } - - /** - * Deletes the log dir event notifications associated with the given sequence numbers. - * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted. - */ - def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = { - val deleteRequests = sequenceNumbers.map { sequenceNumber => - DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1) - } - retryRequestsUntilConnected(deleteRequests) - } - - /** - * Gets the assignments for the given topics. - * @param topics the topics whose partitions we wish to get the assignments for. - * @return the replica assignment for each partition from the given topics. - */ - def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = { - val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic))) - val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq) - getDataResponses.flatMap { getDataResponse => - val topic = getDataResponse.ctx.get.asInstanceOf[String] - if (getDataResponse.resultCode == Code.OK) { - TopicZNode.decode(topic, getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Map.empty[TopicAndPartition, Seq[Int]] - } else { - throw getDataResponse.resultException.get - } - }.toMap - } - - /** - * Get all topics marked for deletion. - * @return sequence of topics marked for deletion. - */ - def getTopicDeletions: Seq[String] = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get - } - } - - /** - * Remove the given topics from the topics marked for deletion. - * @param topics the topics to remove. - */ - def deleteTopicDeletions(topics: Seq[String]): Unit = { - val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1)) - retryRequestsUntilConnected(deleteRequests) - } - - /** - * Returns all reassignments. - * @return the reassignments for each partition. - */ - def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = { - val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ReassignPartitionsZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Map.empty[TopicAndPartition, Seq[Int]] - } else { - throw getDataResponse.resultException.get - } - } - - /** - * Sets the partition reassignment znode with the given reassignment. - * @param reassignment the reassignment to set on the reassignment znode. - * @return SetDataResponse - */ - def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = { - val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1) - retryRequestUntilConnected(setDataRequest) - } - - /** - * Creates the partition reassignment znode with the given reassignment. - * @param reassignment the reassignment to set on the reassignment znode. - * @return CreateResponse - */ - def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = { - val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), - acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT) - retryRequestUntilConnected(createRequest) - } - - /** - * Deletes the partition reassignment znode. - */ - def deletePartitionReassignment(): Unit = { - val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1) - retryRequestUntilConnected(deleteRequest) - } - - /** - * Gets topic partition states for the given partitions. - * @param partitions the partitions for which we want ot get states. - * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state. - */ - def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { - val getDataResponses = getTopicPartitionStatesRaw(partitions) - getDataResponses.flatMap { getDataResponse => - val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition] - if (getDataResponse.resultCode == Code.OK) { - TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - }.toMap - } - - /** - * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path. - * @return sequence of znode names and not the absolute znode path. - */ - def getAllIsrChangeNotifications: Seq[String] = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber) - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get - } - } - - /** - * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions. - * @param sequenceNumbers the sequence numbers associated with the isr change notifications. - * @return partitions associated with the given isr change notifications. - */ - def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = { - val getDataRequests = sequenceNumbers.map { sequenceNumber => - GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber)) - } - val getDataResponses = retryRequestsUntilConnected(getDataRequests) - getDataResponses.flatMap { getDataResponse => - if (getDataResponse.resultCode == Code.OK) { - IsrChangeNotificationSequenceZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - } - } - - /** - * Deletes all isr change notifications. - */ - def deleteIsrChangeNotifications(): Unit = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - deleteIsrChangeNotifications(getChildrenResponse.children) - } else if (getChildrenResponse.resultCode != Code.NONODE) { - throw getChildrenResponse.resultException.get - } - } - - /** - * Deletes the isr change notifications associated with the given sequence numbers. - * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted. - */ - def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = { - val deleteRequests = sequenceNumbers.map { sequenceNumber => - DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1) - } - retryRequestsUntilConnected(deleteRequests) - } - - /** - * Gets the partitions marked for preferred replica election. - * @return sequence of partitions. - */ - def getPreferredReplicaElection: Set[TopicAndPartition] = { - val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - PreferredReplicaElectionZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Set.empty[TopicAndPartition] - } else { - throw getDataResponse.resultException.get - } - } - - /** - * Deletes the preferred replica election znode. - */ - def deletePreferredReplicaElection(): Unit = { - val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1) - retryRequestUntilConnected(deleteRequest) - } - - /** - * Gets the controller id. - * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise. - */ - def getControllerId: Option[Int] = { - val getDataRequest = GetDataRequest(ControllerZNode.path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ControllerZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - } - - /** - * Deletes the controller znode. - */ - def deleteController(): Unit = { - val deleteRequest = DeleteRequest(ControllerZNode.path, -1) - retryRequestUntilConnected(deleteRequest) - } - - /** - * Gets the controller epoch. - * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise. - */ - def getControllerEpoch: Option[(Int, Stat)] = { - val getDataRequest = GetDataRequest(ControllerEpochZNode.path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - val epoch = ControllerEpochZNode.decode(getDataResponse.data) - Option(epoch, getDataResponse.stat) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get - } - } - - /** - * Recursively deletes the topic znode. - * @param topic the topic whose topic znode we wish to delete. - */ - def deleteTopicZNode(topic: String): Unit = { - deleteRecursive(TopicZNode.path(topic)) - } - - /** - * Deletes the topic configs for the given topics. - * @param topics the topics whose configs we wish to delete. - */ - def deleteTopicConfigs(topics: Seq[String]): Unit = { - val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1)) - retryRequestsUntilConnected(deleteRequests) - } - - /** - * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher - * registrations on paths which might not even exist. - * - * @param zNodeChangeHandler - */ - def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = { - zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path)) - if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) { - throw existsResponse.resultException.get - } - } - - /** - * See ZookeeperClient.registerZNodeChangeHandler - * @param zNodeChangeHandler - */ - def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = { - zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - } - - /** - * See ZookeeperClient.unregisterZNodeChangeHandler - * @param path - */ - def unregisterZNodeChangeHandler(path: String): Unit = { - zookeeperClient.unregisterZNodeChangeHandler(path) - } - - /** - * See ZookeeperClient.registerZNodeChildChangeHandler - * @param zNodeChildChangeHandler - */ - def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = { - zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) - } - - /** - * See ZookeeperClient.unregisterZNodeChildChangeHandler - * @param path - */ - def unregisterZNodeChildChangeHandler(path: String): Unit = { - zookeeperClient.unregisterZNodeChildChangeHandler(path) - } - - /** - * Close the underlying ZookeeperClient. - */ - def close(): Unit = { - zookeeperClient.close() - } - - private def deleteRecursive(path: String): Unit = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child")) - val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1)) - if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { - throw deleteResponse.resultException.get - } - } else if (getChildrenResponse.resultCode != Code.NONODE) { - throw getChildrenResponse.resultException.get - } - } - private def createTopicPartition(partitions: Seq[TopicAndPartition]) = { - val createRequests = partitions.map { partition => - val path = TopicPartitionZNode.path(partition) - val data = TopicPartitionZNode.encode - CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition)) - } - retryRequestsUntilConnected(createRequests) - } - - private def createTopicPartitions(topics: Seq[String]) = { - val createRequests = topics.map { topic => - val path = TopicPartitionsZNode.path(topic) - val data = TopicPartitionsZNode.encode - CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(topic)) - } - retryRequestsUntilConnected(createRequests) - } - - private def getTopicConfigs(topics: Seq[String]) = { - val getDataRequests = topics.map { topic => - GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic)) - } - retryRequestsUntilConnected(getDataRequests) - } - - private def acls(path: String) = { - import scala.collection.JavaConverters._ - ZkUtils.defaultAcls(isSecure, path).asScala - } - - private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { - retryRequestsUntilConnected(Seq(request)).head - } - - private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { - val remainingRequests = ArrayBuffer(requests: _*) - val responses = new ArrayBuffer[Req#Response] - while (remainingRequests.nonEmpty) { - val batchResponses = zookeeperClient.handleRequests(remainingRequests) - - // Only execute slow path if we find a response with CONNECTIONLOSS - if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) { - val requestResponsePairs = remainingRequests.zip(batchResponses) - - remainingRequests.clear() - requestResponsePairs.foreach { case (request, response) => - if (response.resultCode == Code.CONNECTIONLOSS) - remainingRequests += request - else - responses += response - } - - if (remainingRequests.nonEmpty) - zookeeperClient.waitUntilConnected() - } else { - remainingRequests.clear() - responses ++= batchResponses - } - } - responses - } - - def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = { - val checkedEphemeral = new CheckedEphemeral(path, data) - info(s"Creating $path (is it secure? $isSecure)") - val code = checkedEphemeral.create() - info(s"Result of znode creation at $path is: $code") - code match { - case Code.OK => - case _ => throw KeeperException.create(code) - } - } - - private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging { - def create(): Code = { - val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL) - val createResponse = retryRequestUntilConnected(createRequest) - val code = createResponse.resultCode - if (code == Code.OK) { - code - } else if (code == Code.NODEEXISTS) { - get() - } else { - error(s"Error while creating ephemeral at $path with return code: $code") - code - } - } - - private def get(): Code = { - val getDataRequest = GetDataRequest(path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - val code = getDataResponse.resultCode - if (code == Code.OK) { - if (getDataResponse.stat.getEphemeralOwner != zookeeperClient.sessionId) { - error(s"Error while creating ephemeral at $path with return code: $code") - Code.NODEEXISTS - } else { - code - } - } else if (code == Code.NONODE) { - info(s"The ephemeral node at $path went away while reading it") - create() - } else { - error(s"Error while creating ephemeral at $path with return code: $code") - code - } - } - } -} - -object KafkaControllerZkUtils { - - /** - * @param successfulPartitions The successfully updated partition states with adjusted znode versions. - * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts - * can occur if the partition leader updated partition state while the controller attempted to - * update partition state. - * @param failedPartitions Exceptions corresponding to failed partition state updates. - */ - case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr], - partitionsToRetry: Seq[TopicAndPartition], - failedPartitions: Map[TopicAndPartition, Exception]) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 1c87b5e..1dee71d 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -18,9 +18,10 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.{StateChangeFailedException, TopicAndPartition} -import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.Logging +import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -43,7 +44,7 @@ class PartitionStateMachine(config: KafkaConfig, stateChangeLogger: StateChangeLogger, controllerContext: ControllerContext, topicDeletionManager: TopicDeletionManager, - zkUtils: KafkaControllerZkUtils, + zkClient: KafkaZkClient, partitionState: mutable.Map[TopicAndPartition, PartitionState], controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging { private val controllerId = config.brokerId @@ -217,7 +218,7 @@ class PartitionStateMachine(config: KafkaConfig, partition -> leaderIsrAndControllerEpoch }.toMap val createResponses = try { - zkUtils.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) + zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) } catch { case e: Exception => partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) } @@ -278,7 +279,7 @@ class PartitionStateMachine(config: KafkaConfig, private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = { val getDataResponses = try { - zkUtils.getTopicPartitionStatesRaw(partitions) + zkClient.getTopicPartitionStatesRaw(partitions) } catch { case e: Exception => return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap) @@ -331,7 +332,7 @@ class PartitionStateMachine(config: KafkaConfig, } val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap - val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr( + val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch) successfulUpdates.foreach { case (partition, leaderAndIsr) => val replicas = controllerContext.partitionReplicaAssignment(partition) @@ -349,7 +350,7 @@ class PartitionStateMachine(config: KafkaConfig, val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) liveInSyncReplicas.isEmpty } - val (logConfigs, failed) = zkUtils.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals()) + val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals()) val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => if (failed.contains(partition.topic)) { logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 4da1c7b..e41007b 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -19,9 +19,10 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.{StateChangeFailedException, TopicAndPartition} import kafka.controller.Callbacks.CallbackBuilder -import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.Logging +import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import org.apache.zookeeper.KeeperException.Code import scala.collection.mutable @@ -48,7 +49,7 @@ class ReplicaStateMachine(config: KafkaConfig, stateChangeLogger: StateChangeLogger, controllerContext: ControllerContext, topicDeletionManager: TopicDeletionManager, - zkUtils: KafkaControllerZkUtils, + zkClient: KafkaZkClient, replicaState: mutable.Map[PartitionAndReplica, ReplicaState], controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging { private val controllerId = config.brokerId @@ -292,7 +293,7 @@ class ReplicaStateMachine(config: KafkaConfig, val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId) leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr) } - val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr( + val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch) val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition => if (!topicDeletionManager.isPartitionToBeDeleted(partition)) { @@ -325,7 +326,7 @@ class ReplicaStateMachine(config: KafkaConfig, val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition] val failed = mutable.Map.empty[TopicAndPartition, Exception] val getDataResponses = try { - zkUtils.getTopicPartitionStatesRaw(partitions) + zkClient.getTopicPartitionStatesRaw(partitions) } catch { case e: Exception => partitions.foreach(partition => failed.put(partition, e)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 52302a3..2e93f9d 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -19,6 +19,7 @@ package kafka.controller import kafka.common.TopicAndPartition import kafka.utils.Logging +import kafka.zk.KafkaZkClient import scala.collection.{Set, mutable} @@ -57,7 +58,7 @@ import scala.collection.{Set, mutable} */ class TopicDeletionManager(controller: KafkaController, eventManager: ControllerEventManager, - kafkaControllerZkUtils: KafkaControllerZkUtils) extends Logging { + zkClient: KafkaZkClient) extends Logging { this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -73,7 +74,7 @@ class TopicDeletionManager(controller: KafkaController, } else { // if delete topic is disabled clean the topic entries under /admin/delete_topics info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled") - kafkaControllerZkUtils.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq) + zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq) } } @@ -239,9 +240,9 @@ class TopicDeletionManager(controller: KafkaController, controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) - kafkaControllerZkUtils.deleteTopicZNode(topic) - kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic)) - kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic)) + zkClient.deleteTopicZNode(topic) + zkClient.deleteTopicConfigs(Seq(topic)) + zkClient.deleteTopicDeletions(Seq(topic)) controllerContext.removeTopic(topic) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZkData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ZkData.scala b/core/src/main/scala/kafka/controller/ZkData.scala deleted file mode 100644 index 2240b6a..0000000 --- a/core/src/main/scala/kafka/controller/ZkData.scala +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.controller - -import java.util.Properties - -import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} -import kafka.cluster.{Broker, EndPoint} -import kafka.common.TopicAndPartition -import kafka.utils.Json -import org.apache.zookeeper.data.Stat - -import scala.collection.Seq - -object ControllerZNode { - def path = "/controller" - def encode(brokerId: Int, timestamp: Long): Array[Byte] = - Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)).getBytes("UTF-8") - def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js => - js.asJsonObject("brokerid").to[Int] - } -} - -object ControllerEpochZNode { - def path = "/controller_epoch" - def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes("UTF-8") - def decode(bytes: Array[Byte]) : Int = new String(bytes, "UTF-8").toInt -} - -object ConfigZNode { - def path = "/config" - def encode: Array[Byte] = null -} - -object BrokersZNode { - def path = "/brokers" - def encode: Array[Byte] = null -} - -object BrokerIdsZNode { - def path = s"${BrokersZNode.path}/ids" - def encode: Array[Byte] = null -} - -object BrokerIdZNode { - def path(id: Int) = s"${BrokerIdsZNode.path}/$id" - def encode(id: Int, - host: String, - port: Int, - advertisedEndpoints: Seq[EndPoint], - jmxPort: Int, - rack: Option[String], - apiVersion: ApiVersion): Array[Byte] = { - val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 - Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes("UTF-8") - } - - def decode(id: Int, bytes: Array[Byte]): Broker = { - Broker.createBroker(id, new String(bytes, "UTF-8")) - } -} - -object TopicsZNode { - def path = s"${BrokersZNode.path}/topics" - def encode: Array[Byte] = null -} - -object TopicZNode { - def path(topic: String) = s"${TopicsZNode.path}/$topic" - def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = { - val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas } - Json.encode(Map("version" -> 1, "partitions" -> assignmentJson)).getBytes("UTF-8") - } - def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = { - Json.parseFull(new String(bytes, "UTF-8")).flatMap { js => - val assignmentJson = js.asJsonObject - val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject) - partitionsJsonOpt.map { partitionsJson => - partitionsJson.iterator.map { case (partition, replicas) => - TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]] - } - } - }.map(_.toMap).getOrElse(Map.empty) - } -} - -object TopicPartitionsZNode { - def path(topic: String) = s"${TopicZNode.path(topic)}/partitions" - def encode: Array[Byte] = null -} - -object TopicPartitionZNode { - def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}" - def encode: Array[Byte] = null -} - -object TopicPartitionStateZNode { - def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state" - def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = { - val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, - "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)).getBytes("UTF-8") - } - def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(new String(bytes, "UTF-8")).map { js => - val leaderIsrAndEpochInfo = js.asJsonObject - val leader = leaderIsrAndEpochInfo("leader").to[Int] - val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int] - val isr = leaderIsrAndEpochInfo("isr").to[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int] - val zkPathVersion = stat.getVersion - LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch) - } - } -} - -object ConfigEntityTypeZNode { - def path(entityType: String) = s"${ConfigZNode.path}/$entityType" - def encode: Array[Byte] = null -} - -object ConfigEntityZNode { - def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName" - def encode(config: Properties): Array[Byte] = { - import scala.collection.JavaConverters._ - Json.encode(Map("version" -> 1, "config" -> config.asScala)).getBytes("UTF-8") - } - def decode(bytes: Array[Byte]): Option[Properties] = { - Json.parseFull(new String(bytes, "UTF-8")).map { js => - val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption)) - val props = new Properties() - configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }) - props - } - } -} - -object IsrChangeNotificationZNode { - def path = "/isr_change_notification" - def encode: Array[Byte] = null -} - -object IsrChangeNotificationSequenceZNode { - val SequenceNumberPrefix = "isr_change_" - def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" - def encode(partitions: Set[TopicAndPartition]): Array[Byte] = { - val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition)) - Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)).getBytes("UTF-8") - } - - def decode(bytes: Array[Byte]): Set[TopicAndPartition] = { - Json.parseFull(new String(bytes, "UTF-8")).map { js => - val partitionsJson = js.asJsonObject("partitions").asJsonArray - partitionsJson.iterator.map { partitionsJson => - val partitionJson = partitionsJson.asJsonObject - val topic = partitionJson("topic").to[String] - val partition = partitionJson("partition").to[Int] - TopicAndPartition(topic, partition) - } - } - }.map(_.toSet).getOrElse(Set.empty) - def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length) -} - -object LogDirEventNotificationZNode { - def path = "/log_dir_event_notification" - def encode: Array[Byte] = null -} - -object LogDirEventNotificationSequenceZNode { - val SequenceNumberPrefix = "log_dir_event_" - val LogDirFailureEvent = 1 - def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" - def encode(brokerId: Int) = - Json.encode(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)).getBytes("UTF-8") - def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js => - js.asJsonObject("broker").to[Int] - } - def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length) -} - -object AdminZNode { - def path = "/admin" - def encode: Array[Byte] = null -} - -object DeleteTopicsZNode { - def path = s"${AdminZNode.path}/delete_topics" - def encode: Array[Byte] = null -} - -object DeleteTopicsTopicZNode { - def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic" - def encode: Array[Byte] = null -} - -object ReassignPartitionsZNode { - def path = s"${AdminZNode.path}/reassign_partitions" - def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = { - val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) => - Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas) - } - Json.encode(Map("version" -> 1, "partitions" -> reassignmentJson)).getBytes("UTF-8") - } - def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseFull(new String(bytes, "UTF-8")).flatMap { js => - val reassignmentJson = js.asJsonObject - val partitionsJsonOpt = reassignmentJson.get("partitions") - partitionsJsonOpt.map { partitionsJson => - partitionsJson.asJsonArray.iterator.map { partitionFieldsJs => - val partitionFields = partitionFieldsJs.asJsonObject - val topic = partitionFields("topic").to[String] - val partition = partitionFields("partition").to[Int] - val replicas = partitionFields("replicas").to[Seq[Int]] - TopicAndPartition(topic, partition) -> replicas - } - } - }.map(_.toMap).getOrElse(Map.empty) -} - -object PreferredReplicaElectionZNode { - def path = s"${AdminZNode.path}/preferred_replica_election" - def encode(partitions: Set[TopicAndPartition]): Array[Byte] = - Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))).getBytes("UTF-8") - def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseFull(new String(bytes, "UTF-8")).map { js => - val partitionsJson = js.asJsonObject("partitions").asJsonArray - partitionsJson.iterator.map { partitionsJson => - val partitionJson = partitionsJson.asJsonObject - val topic = partitionJson("topic").to[String] - val partition = partitionJson("partition").to[Int] - TopicAndPartition(topic, partition) - } - }.map(_.toSet).getOrElse(Set.empty) -} \ No newline at end of file
