This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch kafka-8545-remove-legacy-zk-utils in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1bc6f6ae0ec6da1b75f290d56460883cabbb8f09 Author: Ismael Juma <[email protected]> AuthorDate: Sun Jun 16 01:28:52 2019 -0700 KAFKA-8545: Remove legacy ZkUtils ZkUtils is not used by the broker, has been deprecated since 2.0.0 and it was never intended as a public API. We should remove it. --- core/src/main/scala/kafka/admin/AdminUtils.scala | 431 +--------- core/src/main/scala/kafka/utils/ZkUtils.scala | 890 --------------------- .../other/kafka/ReplicationQuotasTestRig.scala | 6 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 213 ----- .../scala/unit/kafka/admin/TestAdminUtils.scala | 29 - .../test/scala/unit/kafka/utils/ZkUtilsTest.scala | 133 --- 6 files changed, 8 insertions(+), 1694 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index cd47969..c3748cf 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -17,54 +17,16 @@ package kafka.admin -import kafka.log.LogConfig -import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} -import kafka.utils._ -import kafka.utils.ZkUtils._ import java.util.Random -import java.util.Properties -import kafka.common.TopicAlreadyMarkedForDeletionException -import org.apache.kafka.common.errors._ +import kafka.utils.Logging +import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException} -import collection.{Map, Set, mutable, _} -import scala.collection.JavaConverters._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.apache.kafka.common.internals.Topic +import collection.{Map, mutable, _} -@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") -trait AdminUtilities { - def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) - def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) - def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) - def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties) - - def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = { - - def parseBroker(broker: String): Int = { - try broker.toInt - catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value") - } - } - - entityType match { - case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs) - case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs) - case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs) - case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs) - case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") - } - } - - def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties -} - -object AdminUtils extends Logging with AdminUtilities { +object AdminUtils extends Logging { val rand = new Random val AdminClientId = "__admin_client" - val EntityConfigChangeZnodePrefix = "config_change_" /** * There are 3 goals of replica assignment: @@ -256,391 +218,6 @@ object AdminUtils extends Logging with AdminUtilities { .groupBy { case (rack, _) => rack } .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) } } - /** - * Add partitions to existing topic with optional replica assignment - * - * @param zkUtils Zookeeper utilities - * @param topic Topic for adding partitions to - * @param existingAssignment A map from partition id to its assigned replicas - * @param allBrokers All brokers in the cluster - * @param numPartitions Number of partitions to be set - * @param replicaAssignment Manual replica assignment, or none - * @param validateOnly If true, validate the parameters without actually adding the partitions - * @return the updated replica assignment - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def addPartitions(zkUtils: ZkUtils, - topic: String, - existingAssignment: Map[Int, Seq[Int]], - allBrokers: Seq[BrokerMetadata], - numPartitions: Int = 1, - replicaAssignment: Option[Map[Int, Seq[Int]]] = None, - validateOnly: Boolean = false): Map[Int, Seq[Int]] = { - val existingAssignmentPartition0 = existingAssignment.getOrElse(0, - throw new AdminOperationException( - s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " + - s"Assignment: $existingAssignment")) - - val partitionsToAdd = numPartitions - existingAssignment.size - if (partitionsToAdd <= 0) - throw new InvalidPartitionsException( - s"The number of partitions for a topic can only be increased. " + - s"Topic $topic currently has ${existingAssignment.size} partitions, " + - s"$numPartitions would not be an increase.") - - replicaAssignment.foreach { proposedReplicaAssignment => - validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0, - allBrokers.map(_.id).toSet) - } - - val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse { - val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head)) - AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size, - startIndex, existingAssignment.size) - } - val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions - if (!validateOnly) { - info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " + - s"$proposedAssignmentForNewPartitions.") - // add the combined new list - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true) - } - proposedAssignment - - } - - /** - * Parse a replica assignment string of the form: - * {{{ - * broker_id_for_part1_replica1:broker_id_for_part1_replica2, - * broker_id_for_part2_replica1:broker_id_for_part2_replica2, - * ... - * }}} - */ - def parseReplicaAssignment(replicaAssignmentsString: String, startPartitionId: Int): Map[Int, Seq[Int]] = { - val assignmentStrings = replicaAssignmentsString.split(",") - val assignmentMap = mutable.Map[Int, Seq[Int]]() - var partitionId = startPartitionId - for (assignmentString <- assignmentStrings) { - val brokerIds = assignmentString.split(":").map(_.trim.toInt).toSeq - assignmentMap.put(partitionId, brokerIds) - partitionId = partitionId + 1 - } - assignmentMap - } - - private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]], - existingAssignmentPartition0: Seq[Int], - availableBrokerIds: Set[Int]): Unit = { - - replicaAssignment.foreach { case (partitionId, replicas) => - if (replicas.isEmpty) - throw new InvalidReplicaAssignmentException( - s"Cannot have replication factor of 0 for partition id $partitionId.") - if (replicas.size != replicas.toSet.size) - throw new InvalidReplicaAssignmentException( - s"Duplicate brokers not allowed in replica assignment: " + - s"${replicas.mkString(", ")} for partition id $partitionId.") - if (!replicas.toSet.subsetOf(availableBrokerIds)) - throw new BrokerNotAvailableException( - s"Some brokers specified for partition id $partitionId are not available. " + - s"Specified brokers: ${replicas.mkString(", ")}, " + - s"available brokers: ${availableBrokerIds.mkString(", ")}.") - partitionId -> replicas.size - } - val badRepFactors = replicaAssignment.collect { - case (partition, replicas) if replicas.size != existingAssignmentPartition0.size => partition -> replicas.size - } - if (badRepFactors.nonEmpty) { - val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId } - val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId } - val repFactors = sortedBadRepFactors.map { case (_, rf) => rf } - throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " + - s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " + - s"replication factors [${repFactors.mkString(", ")}], respectively.") - } - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def deleteTopic(zkUtils: ZkUtils, topic: String) { - if (topicExists(zkUtils, topic)) { - try { - zkUtils.createPersistentPath(getDeleteTopicPath(topic)) - } catch { - case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException( - "topic %s is already marked for deletion".format(topic)) - case e2: Throwable => throw new AdminOperationException(e2) - } - } else { - throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist") - } - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def topicExists(zkUtils: ZkUtils, topic: String): Boolean = - zkUtils.pathExists(getTopicPath(topic)) - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced, - brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = { - val allBrokers = zkUtils.getAllBrokersInCluster() - val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers) - val brokersWithRack = brokers.filter(_.rack.nonEmpty) - if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) { - throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" + - " to make replica assignment without rack information.") - } - val brokerMetadatas = rackAwareMode match { - case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None)) - case RackAwareMode.Safe if brokersWithRack.size < brokers.size => - brokers.map(broker => BrokerMetadata(broker.id, None)) - case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack)) - } - brokerMetadatas.sortBy(_.id) - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def createTopic(zkUtils: ZkUtils, - topic: String, - partitions: Int, - replicationFactor: Int, - topicConfig: Properties = new Properties, - rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { - val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode) - val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig) - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def validateCreateOrUpdateTopic(zkUtils: ZkUtils, - topic: String, - partitionReplicaAssignment: Map[Int, Seq[Int]], - config: Properties, - update: Boolean): Unit = { - // validate arguments - Topic.validate(topic) - - if (!update) { - if (topicExists(zkUtils, topic)) - throw new TopicExistsException(s"Topic '$topic' already exists.") - else if (Topic.hasCollisionChars(topic)) { - val allTopics = zkUtils.getAllTopics() - // check again in case the topic was created in the meantime, otherwise the - // topic could potentially collide with itself - if (allTopics.contains(topic)) - throw new TopicExistsException(s"Topic '$topic' already exists.") - val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _)) - if (collidingTopics.nonEmpty) { - throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}") - } - } - } - - if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1) - throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas") - - partitionReplicaAssignment.values.foreach(reps => - if (reps.size != reps.toSet.size) - throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment) - ) - - - // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported - if (!update) - LogConfig.validate(config) - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils, - topic: String, - partitionReplicaAssignment: Map[Int, Seq[Int]], - config: Properties = new Properties, - update: Boolean = false) { - validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update) - - // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported - if (!update) { - // write out the config if there is any, this isn't transactional with the partition assignments - writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config) - } - - // create the partition assignment - writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update) - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { - try { - val zkPath = getTopicPath(topic) - val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2)) - - if (!update) { - info(s"Topic creation $jsonPartitionData") - zkUtils.createPersistentPath(zkPath, jsonPartitionData) - } else { - info(s"Topic update $jsonPartitionData") - zkUtils.updatePersistentPath(zkPath, jsonPartitionData) - } - debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) - } catch { - case _: ZkNodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.") - case e2: Throwable => throw new AdminOperationException(e2.toString) - } - } - - /** - * Update the config for a client and create a change notification so the change will propagate to other brokers. - * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId> - * and <user> configs are not specified. - * - * @param zkUtils Zookeeper utilities used to write the config to ZK - * @param sanitizedClientId: The sanitized clientId for which configs are being changed - * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or - * existing configs need to be deleted, it should be done prior to invoking this API - * - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) { - DynamicConfig.Client.validate(configs) - changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs) - } - - /** - * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers. - * User and/or clientId components of the path may be <default>, indicating that the configuration is the default - * value to be applied if a more specific override is not configured. - * - * @param zkUtils Zookeeper utilities used to write the config to ZK - * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId> - * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or - * existing configs need to be deleted, it should be done prior to invoking this API - * - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) { - if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients")) - DynamicConfig.Client.validate(configs) - else - DynamicConfig.User.validate(configs) - changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs) - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = { - Topic.validate(topic) - if (!topicExists(zkUtils, topic)) - throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - // remove the topic overrides - LogConfig.validate(configs) - } - - /** - * Update the config for an existing topic and create a change notification so the change will propagate to other brokers - * - * @param zkUtils Zookeeper utilities used to write the config to ZK - * @param topic: The topic for which configs are being changed - * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or - * existing configs need to be deleted, it should be done prior to invoking this API - * - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) { - validateTopicConfig(zkUtils, topic, configs) - changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs) - } - - /** - * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will - * override any defaults entered in the broker's config files - * - * @param zkUtils: Zookeeper utilities used to write the config to ZK - * @param brokers: The list of brokers to apply config changes to - * @param configs: The config to change, as properties - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = { - DynamicConfig.Broker.validate(configs) - brokers.foreach { broker => - changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs) - } - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) { - val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName - val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName) - // write the new config--may not exist if there were previously no overrides - writeEntityConfig(zkUtils, entityConfigPath, configs) - - // create the change notification - val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix - val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath)) - zkUtils.createSequentialPersistentPath(seqNode, content) - } - - def getConfigChangeZnodeData(sanitizedEntityPath: String): Map[String, Any] = { - Map("version" -> 2, "entity_path" -> sanitizedEntityPath) - } - - /** - * Write out the entity config to zk, if there is any - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) { - val map = Map("version" -> 1, "config" -> config.asScala) - zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map)) - } - - /** - * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk - * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>. - */ - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = { - val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName) - // readDataMaybeNull returns Some(null) if the path exists, but there is no data - val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull - val props = new Properties() - if (str != null) { - Json.parseFull(str).foreach { jsValue => - val jsObject = jsValue.asJsonObjectOption.getOrElse { - throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath") - } - require(jsObject("version").to[Int] == 1) - val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse { - throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str") - } - config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) } - } - } - props - } - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] = - zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] = - zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap - - @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0") - def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = { - def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = { - val root = rootPath match { - case Some(path) => rootEntityType + '/' + path - case None => rootEntityType - } - val entityNames = zkUtils.getAllEntitiesWithConfig(root) - rootPath match { - case Some(path) => entityNames.map(entityName => path + '/' + entityName) - case None => entityNames - } - } - entityPaths(zkUtils, None) - .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType))) - .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap - } private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala deleted file mode 100644 index dd85090..0000000 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ /dev/null @@ -1,890 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.nio.charset.StandardCharsets - -import kafka.admin._ -import kafka.api.LeaderAndIsr -import kafka.cluster._ -import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition} -import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} -import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData} -import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException} -import org.I0Itec.zkclient.serialize.ZkSerializer -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} -import org.apache.kafka.common.config.ConfigException -import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.ZooDefs - -import scala.collection._ -import scala.collection.JavaConverters._ -import org.apache.kafka.common.TopicPartition - -@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " + - "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0") -object ZkUtils { - - private val UseDefaultAcls = new java.util.ArrayList[ACL] - - // Important: it is necessary to add any new top level Zookeeper path here - val AdminPath = "/admin" - val BrokersPath = "/brokers" - val ClusterPath = "/cluster" - val ConfigPath = "/config" - val ControllerPath = "/controller" - val ControllerEpochPath = "/controller_epoch" - val IsrChangeNotificationPath = "/isr_change_notification" - val LogDirEventNotificationPath = "/log_dir_event_notification" - val KafkaAclPath = "/kafka-acl" - val KafkaAclChangesPath = "/kafka-acl-changes" - - val ConsumersPath = "/consumers" - val ClusterIdPath = s"$ClusterPath/id" - val BrokerIdsPath = s"$BrokersPath/ids" - val BrokerTopicsPath = s"$BrokersPath/topics" - val ReassignPartitionsPath = s"$AdminPath/reassign_partitions" - val DeleteTopicsPath = s"$AdminPath/delete_topics" - val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election" - val BrokerSequenceIdPath = s"$BrokersPath/seqid" - val ConfigChangesPath = s"$ConfigPath/changes" - val ConfigUsersPath = s"$ConfigPath/users" - val ConfigBrokersPath = s"$ConfigPath/brokers" - val ProducerIdBlockPath = "/latest_producer_id_block" - - val SecureZkRootPaths = ZkData.SecureRootPaths - - val SensitiveZkRootPaths = ZkData.SensitiveRootPaths - - def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = { - val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) - new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled) - } - - /* - * Used in tests - */ - def apply(zkClient: ZkClient, isZkSecurityEnabled: Boolean): ZkUtils = { - new ZkUtils(zkClient, null, isZkSecurityEnabled) - } - - def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { - val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) - zkClient - } - - def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = { - val zkConnection = new ZkConnection(zkUrl, sessionTimeout) - val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer) - (zkClient, zkConnection) - } - - def sensitivePath(path: String): Boolean = ZkData.sensitivePath(path) - - def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = ZkData.defaultAcls(isSecure, path).asJava - - def maybeDeletePath(zkUrl: String, dir: String) { - try { - val zk = createZkClient(zkUrl, 30*1000, 30*1000) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _: Throwable => // swallow - } - } - - /* - * Get calls that only depend on static paths - */ - def getTopicPath(topic: String): String = { - ZkUtils.BrokerTopicsPath + "/" + topic - } - - def getTopicPartitionsPath(topic: String): String = { - getTopicPath(topic) + "/partitions" - } - - def getTopicPartitionPath(topic: String, partitionId: Int): String = - getTopicPartitionsPath(topic) + "/" + partitionId - - def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String = - getTopicPartitionPath(topic, partitionId) + "/" + "state" - - def getEntityConfigRootPath(entityType: String): String = - ZkUtils.ConfigPath + "/" + entityType - - def getEntityConfigPath(entityType: String, entity: String): String = - getEntityConfigRootPath(entityType) + "/" + entity - - def getEntityConfigPath(entityPath: String): String = - ZkUtils.ConfigPath + "/" + entityPath - - def getDeleteTopicPath(topic: String): String = - DeleteTopicsPath + "/" + topic - - def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8) - val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match { - case Left(e) => throw e - case Right(result) => result - } - - assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) } - } - - def controllerZkData(brokerId: Int, timestamp: Long): String = { - Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) - } - - def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = { - Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))) - } - - def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { - Json.legacyEncodeAsString(Map( - "version" -> 1, - "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) => - Map( - "topic" -> topic, - "partition" -> partition, - "replicas" -> replicas - ) - } - )) - } - - def getReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]]): String = { - Json.encodeAsString(Map( - "version" -> 1, - "partitions" -> partitionsToBeReassigned.map { case (tp, replicas) => - Map( - "topic" -> tp.topic, - "partition" -> tp.partition, - "replicas" -> replicas.asJava - ).asJava - }.asJava - ).asJava) - } -} - -/** - * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead. - */ -@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " + - "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0") -class ZkUtils(val zkClient: ZkClient, - val zkConnection: ZkConnection, - val isSecure: Boolean) extends Logging { - import ZkUtils._ - - // These are persistent ZK paths that should exist on kafka broker startup. - val persistentZkPaths = ZkData.PersistentZkPaths - - // Visible for testing - val zkPath = new ZkPath(zkClient) - - def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path) - - def getController(): Int = { - readDataMaybeNull(ControllerPath)._1 match { - case Some(controller) => parseControllerId(controller) - case None => throw new KafkaException("Controller doesn't exist") - } - } - - def parseControllerId(controllerInfoString: String): Int = { - try { - Json.parseFull(controllerInfoString) match { - case Some(js) => js.asJsonObject("brokerid").to[Int] - case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) - } - } catch { - case _: Throwable => - // It may be due to an incompatible controller register version - warn("Failed to parse the controller info as json. " - + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) - try controllerInfoString.toInt - catch { - case t: Throwable => throw new KafkaException(s"Failed to parse the controller info: $controllerInfoString. This is neither the new or the old format.", t) - } - } - } - - /* Represents a cluster identifier. Stored in Zookeeper in JSON format: {"version" -> "1", "id" -> id } */ - object ClusterId { - - def toJson(id: String) = { - Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id)) - } - - def fromJson(clusterIdJson: String): String = { - Json.parseFull(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse { - throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson") - } - } - } - - def getClusterId: Option[String] = - readDataMaybeNull(ClusterIdPath)._1.map(ClusterId.fromJson) - - def createOrGetClusterId(proposedClusterId: String): String = { - try { - createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId)) - proposedClusterId - } catch { - case _: ZkNodeExistsException => - getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper.")) - } - } - - def getSortedBrokerList(): Seq[Int] = - getChildren(BrokerIdsPath).map(_.toInt).sorted - - def getAllBrokersInCluster(): Seq[Broker] = { - val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted - brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get) - } - - def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = { - val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) - val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath) - debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition") - leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat).map(_.leaderAndIsr)) - } - - private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(leaderAndIsrStr).flatMap { js => - val leaderIsrAndEpochInfo = js.asJsonObject - val leader = leaderIsrAndEpochInfo("leader").to[Int] - val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int] - val isr = leaderIsrAndEpochInfo("isr").to[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int] - val zkPathVersion = stat.getVersion - trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path") - Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))} - } - - def setupCommonPaths() { - for(path <- persistentZkPaths) - makeSurePersistentPathExists(path) - } - - def getLeaderForPartition(topic: String, partition: Int): Option[Int] = { - readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1.flatMap { leaderAndIsr => - Json.parseFull(leaderAndIsr).map(_.asJsonObject("leader").to[Int]) - } - } - - /** - * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the - * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some - * other broker will retry becoming leader with the same new epoch value. - */ - def getEpochForPartition(topic: String, partition: Int): Int = { - readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 match { - case Some(leaderAndIsr) => - Json.parseFull(leaderAndIsr) match { - case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition)) - case Some(js) => js.asJsonObject("leader_epoch").to[Int] - } - case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty" - .format(topic, partition)) - } - } - - /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. - * users can provide brokerId in the config , inorder to avoid conflicts between zk generated - * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId. - */ - def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = { - getSequenceId(BrokerSequenceIdPath) + MaxReservedBrokerId - } - - /** - * Gets the in-sync replicas (ISR) for a specific topic and partition - */ - def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = { - val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 - leaderAndIsrOpt match { - case Some(leaderAndIsr) => - Json.parseFull(leaderAndIsr) match { - case Some(js) => js.asJsonObject("isr").to[Seq[Int]] - case None => Seq.empty[Int] - } - case None => Seq.empty[Int] - } - } - - /** - * Gets the assigned replicas (AR) for a specific topic and partition - */ - def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = { - val seqOpt = for { - jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1 - js <- Json.parseFull(jsonPartitionMap) - replicaMap <- js.asJsonObject.get("partitions") - seq <- replicaMap.asJsonObject.get(partition.toString) - } yield seq.to[Seq[Int]] - seqOpt.getOrElse(Seq.empty) - } - - def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { - Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, - "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) - } - - /** - * Get JSON partition to replica map from zookeeper. - */ - def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = { - Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map)) - } - - /** - * make sure a persistent path exists in ZK. Create the path if not exist. - */ - def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = UseDefaultAcls) { - //Consumer path is kept open as different consumers will write under this node. - val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) { - ZooDefs.Ids.OPEN_ACL_UNSAFE - } else if (acls eq UseDefaultAcls) { - ZkUtils.defaultAcls(isSecure, path) - } else { - acls - } - - if (!zkClient.exists(path)) - zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException - } - - /** - * create the parent path - */ - private def createParentPath(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - val parentDir = path.substring(0, path.lastIndexOf('/')) - if (parentDir.length != 0) { - zkPath.createPersistent(parentDir, createParents = true, acl) - } - } - - /** - * Create an ephemeral node with the given path and data. Create parents if necessary. - */ - private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - try { - zkPath.createEphemeral(path, data, acl) - } catch { - case _: ZkNoNodeException => - createParentPath(path) - zkPath.createEphemeral(path, data, acl) - } - } - - /** - * Create an ephemeral node with the given path and data. - * Throw NodeExistException if node already exists. - */ - def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - try { - createEphemeralPath(path, data, acl) - } catch { - case e: ZkNodeExistsException => - // this can happen when there is connection loss; make sure the data is what we intend to write - var storedData: String = null - try { - storedData = readData(path)._1 - } catch { - case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - } - if (storedData == null || storedData != data) { - info(s"conflict in $path data: $data stored data: $storedData") - throw e - } else { - // otherwise, the creation succeeded, return normally - info(s"$path exists with value $data during connection loss; this is ok") - } - } - } - - /** - * Create a persistent node with the given path and data. Create parents if necessary. - */ - def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - try { - zkPath.createPersistent(path, data, acl) - } catch { - case _: ZkNoNodeException => - createParentPath(path) - zkPath.createPersistent(path, data, acl) - } - } - - def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - zkPath.createPersistentSequential(path, data, acl) - } - - /** - * Update the value of a persistent node with the given path and data. - * create parent directory if necessary. Never throw NodeExistException. - * Return the updated path zkVersion - */ - def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - try { - zkClient.writeData(path, data) - } catch { - case _: ZkNoNodeException => - createParentPath(path) - try { - zkPath.createPersistent(path, data, acl) - } catch { - case _: ZkNodeExistsException => - zkClient.writeData(path, data) - } - } - } - - /** - * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't - * exist, the current version is not the expected version, etc.) return (false, -1) - * - * When there is a ConnectionLossException during the conditional update, zkClient will retry the update and may fail - * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one). - * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded. - */ - def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, - optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = { - try { - val stat = zkClient.writeDataReturnStat(path, data, expectVersion) - debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" - .format(path, data, expectVersion, stat.getVersion)) - (true, stat.getVersion) - } catch { - case e1: ZkBadVersionException => - optionalChecker match { - case Some(checker) => checker(this, path, data) - case _ => - debug("Checker method is not passed skipping zkData match") - debug("Conditional update of path %s with data %s and expected version %d failed due to %s" - .format(path, data,expectVersion, e1.getMessage)) - (false, -1) - } - case e2: Exception => - debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e2.getMessage)) - (false, -1) - } - } - - /** - * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current - * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException - */ - def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = { - try { - val stat = zkClient.writeDataReturnStat(path, data, expectVersion) - debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" - .format(path, data, expectVersion, stat.getVersion)) - (true, stat.getVersion) - } catch { - case nne: ZkNoNodeException => throw nne - case e: Exception => - error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e.getMessage)) - (false, -1) - } - } - - /** - * Update the value of a ephemeral node with the given path and data. - * create parent directory if necessary. Never throw NodeExistException. - */ - def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { - val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - try { - zkClient.writeData(path, data) - } catch { - case _: ZkNoNodeException => - createParentPath(path) - zkPath.createEphemeral(path, data, acl) - } - } - - def deletePath(path: String): Boolean = { - zkClient.delete(path) - } - - /** - * Conditional delete the persistent path data, return true if it succeeds, - * false otherwise (the current version is not the expected version) - */ - def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = { - try { - zkClient.delete(path, expectedVersion) - true - } catch { - case _: ZkBadVersionException => false - } - } - - def deletePathRecursive(path: String) { - zkClient.deleteRecursive(path) - } - - def subscribeDataChanges(path: String, listener: IZkDataListener): Unit = - zkClient.subscribeDataChanges(path, listener) - - def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit = - zkClient.unsubscribeDataChanges(path, dataListener) - - def subscribeStateChanges(listener: IZkStateListener): Unit = - zkClient.subscribeStateChanges(listener) - - def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] = - Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala) - - def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit = - zkClient.unsubscribeChildChanges(path, childListener) - - def unsubscribeAll(): Unit = - zkClient.unsubscribeAll() - - def readData(path: String): (String, Stat) = { - val stat: Stat = new Stat() - val dataStr: String = zkClient.readData[String](path, stat) - (dataStr, stat) - } - - def readDataMaybeNull(path: String): (Option[String], Stat) = { - val stat = new Stat() - val dataAndStat = try { - val dataStr = zkClient.readData[String](path, stat) - (Some(dataStr), stat) - } catch { - case _: ZkNoNodeException => - (None, stat) - } - dataAndStat - } - - def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = { - val stat = new Stat() - try { - val data = zkClient.readData[String](path, stat) - (Option(data), stat.getVersion) - } catch { - case _: ZkNoNodeException => (None, stat.getVersion) - } - } - - def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala - - def getChildrenParentMayNotExist(path: String): Seq[String] = { - try { - zkClient.getChildren(path).asScala - } catch { - case _: ZkNoNodeException => Nil - } - } - - /** - * Check if the given path exists - */ - def pathExists(path: String): Boolean = { - zkClient.exists(path) - } - - def isTopicMarkedForDeletion(topic: String): Boolean = { - pathExists(getDeleteTopicPath(topic)) - } - - def getCluster(): Cluster = { - val cluster = new Cluster - val nodes = getChildrenParentMayNotExist(BrokerIdsPath) - for (node <- nodes) { - val brokerZKString = readData(BrokerIdsPath + "/" + node)._1 - cluster.add(parseBrokerJson(node.toInt, brokerZKString)) - } - cluster - } - - private def parseBrokerJson(id: Int, jsonString: String): Broker = { - BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker - } - - def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { - val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] - for(topicAndPartition <- topicAndPartitions) { - getLeaderIsrAndEpochForPartition(topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch => - ret.put(topicAndPartition, leaderIsrAndControllerEpoch) - } - } - ret - } - - private[utils] def getLeaderIsrAndEpochForPartition(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = { - val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) - val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath) - debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition") - leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat)) - } - - def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = { - val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]] - topics.foreach { topic => - readDataMaybeNull(getTopicPath(topic))._1.foreach { jsonPartitionMap => - Json.parseFull(jsonPartitionMap).foreach { js => - js.asJsonObject.get("partitions").foreach { partitionsJs => - partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) => - ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]]) - debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) - } - } - } - } - } - ret - } - - def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = { - val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]() - topics.foreach { topic => - val partitionMapOpt = for { - jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1 - js <- Json.parseFull(jsonPartitionMap) - replicaMap <- js.asJsonObject.get("partitions") - } yield replicaMap.asJsonObject.iterator.map { case (k, v) => (k.toInt, v.to[Seq[Int]]) }.toMap - val partitionMap = partitionMapOpt.getOrElse(Map.empty) - debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap)) - ret += (topic -> partitionMap) - } - ret - } - - def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = { - getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap => - val topic = topicAndPartitionMap._1 - val partitionMap = topicAndPartitionMap._2 - debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap)) - topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t) - } - } - - def getTopicPartitionCount(topic: String): Option[Int] = { - val topicData = getPartitionAssignmentForTopics(Seq(topic)) - if (topicData(topic).nonEmpty) - Some(topicData(topic).size) - else - None - } - - def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = { - // read the partitions and their new replica list - val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1 - jsonPartitionMapOpt match { - case Some(jsonPartitionMap) => - val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap) - reassignedPartitions.map { case (tp, newReplicas) => - tp -> new ReassignedPartitionsContext(newReplicas, null) - } - case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext] - } - } - - def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { - val zkPath = ZkUtils.ReassignPartitionsPath - partitionsToBeReassigned.size match { - case 0 => // need to delete the /admin/reassign_partitions path - deletePath(zkPath) - info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath)) - case _ => - val jsonData = formatAsReassignmentJson(partitionsToBeReassigned) - try { - updatePersistentPath(zkPath, jsonData) - debug("Updated partition reassignment path with %s".format(jsonData)) - } catch { - case _: ZkNoNodeException => - createPersistentPath(zkPath, jsonData) - debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2: Throwable => throw new AdminOperationException(e2.toString) - } - } - } - - def getPartitionsUndergoingPreferredReplicaElection(): Set[TopicAndPartition] = { - // read the partitions and their new replica list - val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1 - jsonPartitionListOpt match { - case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList).map(tp => new TopicAndPartition(tp)) - case None => Set.empty[TopicAndPartition] - } - } - - def deletePartition(brokerId: Int, topic: String) { - val brokerIdPath = BrokerIdsPath + "/" + brokerId - zkClient.delete(brokerIdPath) - val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId - zkClient.delete(brokerPartTopicPath) - } - - /** - * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker - * or throws an exception if the broker dies before the query to zookeeper finishes - * - * @param brokerId The broker id - * @return An optional Broker object encapsulating the broker metadata - */ - def getBrokerInfo(brokerId: Int): Option[Broker] = { - readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match { - case Some(brokerInfo) => Some(parseBrokerJson(brokerId, brokerInfo)) - case None => None - } - } - - /** - * This API produces a sequence number by creating / updating given path in zookeeper - * It uses the stat returned by the zookeeper and return the version. Every time - * client updates the path stat.version gets incremented. Starting value of sequence number is 1. - */ - def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = { - val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion - try { - writeToZk - } catch { - case _: ZkNoNodeException => - makeSurePersistentPathExists(path, acl) - writeToZk - } - } - - def getAllTopics(): Seq[String] = { - val topics = getChildrenParentMayNotExist(BrokerTopicsPath) - if(topics == null) - Seq.empty[String] - else - topics - } - - /** - * Returns all the entities whose configs have been overridden. - */ - def getAllEntitiesWithConfig(entityType: String): Seq[String] = { - val entities = getChildrenParentMayNotExist(getEntityConfigRootPath(entityType)) - if(entities == null) - Seq.empty[String] - else - entities - } - - def getAllPartitions(): Set[TopicAndPartition] = { - val topics = getChildrenParentMayNotExist(BrokerTopicsPath) - if (topics == null) Set.empty[TopicAndPartition] - else { - topics.flatMap { topic => - // The partitions path may not exist if the topic is in the process of being deleted - getChildrenParentMayNotExist(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _)) - }.toSet - } - } - - def close() { - zkClient.close() - } -} - -private object ZKStringSerializer extends ZkSerializer { - - @throws(classOf[ZkMarshallingError]) - def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") - - @throws(classOf[ZkMarshallingError]) - def deserialize(bytes : Array[Byte]): Object = { - if (bytes == null) - null - else - new String(bytes, "UTF-8") - } -} - -object ZKConfig { - val ZkConnectProp = "zookeeper.connect" - val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms" - val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms" - val ZkSyncTimeMsProp = "zookeeper.sync.time.ms" -} - -class ZKConfig(props: VerifiableProperties) { - import ZKConfig._ - - /** ZK host string */ - val zkConnect = props.getString(ZkConnectProp) - - /** zookeeper session timeout */ - val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000) - - /** the max time that the client waits to establish a connection to zookeeper */ - val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs) - - /** how far a ZK follower can be behind a ZK leader */ - val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000) -} - -class ZkPath(zkClient: ZkClient) { - - @volatile private var isNamespacePresent: Boolean = false - - def checkNamespace() { - if (isNamespacePresent) - return - - if (!zkClient.exists("/")) { - throw new ConfigException("Zookeeper namespace does not exist") - } - isNamespacePresent = true - } - - def resetNamespaceCheckedState() { - isNamespacePresent = false - } - - def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) { - checkNamespace() - zkClient.createPersistent(path, data, acls) - } - - def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) { - checkNamespace() - zkClient.createPersistent(path, createParents, acls) - } - - def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) { - checkNamespace() - zkClient.createEphemeral(path, data, acls) - } - - def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = { - checkNamespace() - zkClient.createPersistentSequential(path, data, acls) - } -} diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala index 20c28e7..4ec8665 100644 --- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -18,6 +18,7 @@ package kafka import java.io.{File, PrintWriter} +import java.nio.charset.StandardCharsets import java.nio.file.{Files, StandardOpenOption} import javax.imageio.ImageIO @@ -25,7 +26,7 @@ import kafka.admin.ReassignPartitionsCommand import kafka.admin.ReassignPartitionsCommand.Throttle import kafka.server.{KafkaConfig, KafkaServer, QuotaType} import kafka.utils.TestUtils._ -import kafka.utils.{Exit, Logging, TestUtils, ZkUtils} +import kafka.utils.{Exit, Logging, TestUtils} import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -138,7 +139,8 @@ object ReplicationQuotasTestRig { val newAssignment = ReassignPartitionsCommand.generateAssignment(zkClient, brokers, json(topicName), true)._1 val start = System.currentTimeMillis() - ReassignPartitionsCommand.executeAssignment(zkClient, None, ZkUtils.getReassignmentJson(newAssignment), Throttle(config.throttle)) + ReassignPartitionsCommand.executeAssignment(zkClient, None, + new String(ReassignPartitionsZNode.encode(newAssignment), StandardCharsets.UTF_8), Throttle(config.throttle)) //Await completion waitForReassignmentToComplete() diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala deleted file mode 100755 index fa4206b..0000000 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException} -import org.apache.kafka.common.metrics.Quota -import org.easymock.EasyMock -import org.junit.Assert._ -import org.junit.{After, Before, Test} -import java.util.Properties - -import kafka.utils._ -import kafka.zk.{ConfigEntityZNode, ZooKeeperTestHarness} -import kafka.utils.{Logging, TestUtils, ZkUtils} -import kafka.server.{ConfigType, KafkaConfig, KafkaServer} - -import scala.collection.{Map, immutable} -import org.apache.kafka.common.security.JaasUtils -import org.scalatest.Assertions.intercept - -import scala.collection.JavaConverters._ - -@deprecated("This test has been deprecated and will be removed in a future release.", "1.1.0") -class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { - - var servers: Seq[KafkaServer] = Seq() - var zkUtils: ZkUtils = null - - @Before - override def setUp() { - super.setUp() - zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled)) - } - - @After - override def tearDown() { - if (zkUtils != null) - CoreUtils.swallow(zkUtils.close(), this) - TestUtils.shutdownServers(servers) - super.tearDown() - } - - @Test - def testManualReplicaAssignment() { - val brokers = List(0, 1, 2, 3, 4) - TestUtils.createBrokersInZk(zkClient, brokers) - - // duplicate brokers - intercept[InvalidReplicaAssignmentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0))) - } - - // inconsistent replication factor - intercept[InvalidReplicaAssignmentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0))) - } - - // good assignment - val assignment = Map(0 -> List(0, 1, 2), - 1 -> List(1, 2, 3)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment) - val found = zkUtils.getPartitionAssignmentForTopics(Seq("test")) - assertEquals(assignment, found("test")) - } - - @Test - def testTopicCreationInZK() { - val expectedReplicaAssignment = Map( - 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3), - 2 -> List(2, 3, 4), - 3 -> List(3, 4, 0), - 4 -> List(4, 0, 1), - 5 -> List(0, 2, 3), - 6 -> List(1, 3, 4), - 7 -> List(2, 4, 0), - 8 -> List(3, 0, 1), - 9 -> List(4, 1, 2), - 10 -> List(1, 2, 3), - 11 -> List(1, 3, 4) - ) - val leaderForPartitionMap = immutable.Map( - 0 -> 0, - 1 -> 1, - 2 -> 2, - 3 -> 3, - 4 -> 4, - 5 -> 0, - 6 -> 1, - 7 -> 2, - 8 -> 3, - 9 -> 4, - 10 -> 1, - 11 -> 1 - ) - val topic = "test" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) - // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) - // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap - assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) - for(i <- 0 until actualReplicaList.size) - assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) - - intercept[TopicExistsException] { - // shouldn't be able to create a topic that already exists - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) - } - } - - @Test - def testTopicCreationWithCollision() { - val topic = "test.topic" - val collidingTopic = "test_topic" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) - // create the topic - AdminUtils.createTopic(zkUtils, topic, 3, 1) - - intercept[InvalidTopicException] { - // shouldn't be able to create a topic that collides - AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1) - } - } - - @Test - def testConcurrentTopicCreation() { - val topic = "test.topic" - - // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes - val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false) - EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic")) - EasyMock.replay(zkMock) - - intercept[TopicExistsException] { - AdminUtils.validateCreateOrUpdateTopic(zkMock, topic, Map.empty, new Properties, update = false) - } - } - - /** - * This test simulates a client config change in ZK whose notification has been purged. - * Basically, it asserts that notifications are bootstrapped from ZK - */ - @Test - def testBootstrapClientIdConfig() { - val clientId = "my-client" - val props = new Properties() - props.setProperty("producer_byte_rate", "1000") - props.setProperty("consumer_byte_rate", "2000") - - // Write config without notification to ZK. - val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000") - val map = Map("version" -> 1, "config" -> configMap.asJava) - zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava)) - - val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client) - assertEquals("Must have 1 overridden client config", 1, configInZk.size) - assertEquals(props, configInZk(clientId)) - - // Test that the existing clientId overrides are read - val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) - servers = Seq(server) - assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId)) - } - - @Test - def testGetBrokerMetadatas() { - // broker 4 has no rack information - val brokerList = 0 to 5 - val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3") - val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet)) - TestUtils.createBrokersInZk(brokerMetadatas, zkClient) - - val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled) - assertEquals(brokerList, processedMetadatas1.map(_.id)) - assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack)) - - val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe) - assertEquals(brokerList, processedMetadatas2.map(_.id)) - assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack)) - - intercept[AdminOperationException] { - AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced) - } - - val partialList = List(0, 1, 2, 3, 5) - val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList)) - assertEquals(partialList, processedMetadatas3.map(_.id)) - assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack)) - - val numPartitions = 3 - AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe) - val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")) - assertEquals(numPartitions, assignment.size) - } -} diff --git a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala deleted file mode 100644 index 7cbad05..0000000 --- a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.util.Properties -import kafka.utils.ZkUtils - -@deprecated("This class is deprecated since AdminUtilities will be replaced by kafka.zk.AdminZkClient.", "1.1.0") -class TestAdminUtils extends AdminUtilities { - override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit = {} - override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {new Properties} - override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties): Unit = {} - override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties): Unit = {} - override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {} -} diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala deleted file mode 100755 index c0c3c6b..0000000 --- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import kafka.api.LeaderAndIsr -import kafka.common.TopicAndPartition -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.security.JaasUtils -import org.junit.Assert._ -import org.junit.{After, Before, Test} - -@deprecated("Deprecated given that ZkUtils is deprecated", since = "2.0.0") -class ZkUtilsTest extends ZooKeeperTestHarness { - - val path = "/path" - var zkUtils: ZkUtils = _ - - @Before - override def setUp() { - super.setUp - zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled)) - } - - @After - override def tearDown() { - if (zkUtils != null) - CoreUtils.swallow(zkUtils.close(), this) - super.tearDown - } - - @Test - def testSuccessfulConditionalDeletePath() { - // Given an existing path - zkUtils.createPersistentPath(path) - val (_, statAfterCreation) = zkUtils.readData(path) - - // Deletion is successful when the version number matches - assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion)) - val (optionalData, _) = zkUtils.readDataMaybeNull(path) - assertTrue("Node should be deleted", optionalData.isEmpty) - - // Deletion is successful when the node does not exist too - assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0)) - } - - // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it - @Test - def testPersistentSequentialPath() { - // Given an existing path - zkUtils.createPersistentPath(path) - - var result = zkUtils.createSequentialPersistentPath(path + "/sequence_") - - assertEquals("/path/sequence_0000000000", result) - - result = zkUtils.createSequentialPersistentPath(path + "/sequence_") - - assertEquals("/path/sequence_0000000001", result) - } - - @Test - def testAbortedConditionalDeletePath() { - // Given an existing path that gets updated - zkUtils.createPersistentPath(path) - val (_, statAfterCreation) = zkUtils.readData(path) - zkUtils.updatePersistentPath(path, "data") - - // Deletion is aborted when the version number does not match - assertFalse("Deletion should be aborted", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion)) - val (optionalData, _) = zkUtils.readDataMaybeNull(path) - assertTrue("Node should still be there", optionalData.isDefined) - } - - @Test - def testClusterIdentifierJsonParsing() { - val clusterId = "test" - assertEquals(zkUtils.ClusterId.fromJson(zkUtils.ClusterId.toJson(clusterId)), clusterId) - } - - @Test - def testGetAllPartitionsTopicWithoutPartitions() { - val topic = "testtopic" - // Create a regular topic and a topic without any partitions - zkUtils.createPersistentPath(ZkUtils.getTopicPartitionPath(topic, 0)) - zkUtils.createPersistentPath(ZkUtils.getTopicPath("nopartitions")) - - assertEquals(Set(TopicAndPartition(topic, 0)), zkUtils.getAllPartitions()) - } - - @Test - def testGetLeaderIsrAndEpochForPartition() { - val topic = "my-topic-test" - val partition = 0 - val leader = 1 - val leaderEpoch = 1 - val controllerEpoch = 1 - val isr = List(1, 2) - val topicPath = s"/brokers/topics/$topic/partitions/$partition/state" - val topicData = Json.legacyEncodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader, - "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr)) - zkUtils.createPersistentPath(topicPath, topicData) - - val leaderIsrAndControllerEpoch = zkUtils.getLeaderIsrAndEpochForPartition(topic, partition) - val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, 0), - controllerEpoch) - assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get) - assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1)) - } - - @Test - def testGetSequenceIdMethod() { - val path = "/test/seqid" - (1 to 10).foreach { seqid => - assertEquals(seqid, zkUtils.getSequenceId(path)) - } - } -}
