Repository: kafka Updated Branches: refs/heads/trunk 9790d5aba -> 1d24e10ae
MINOR: Remove unused `AdminUtils.fetchTopicMetadataFromZk` methods These are internal methods with no tests and since we now have an `AdminClient`, we should remove them. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3299 from ijuma/remove-unused-admin-utils-methods Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d24e10a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d24e10a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d24e10a Branch: refs/heads/trunk Commit: 1d24e10aeab616eede416201336e928b9a8efa98 Parents: 9790d5a Author: Ismael Juma <[email protected]> Authored: Tue Jun 20 11:29:11 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Tue Jun 20 11:29:11 2017 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AdminUtils.scala | 99 +------------------- .../unit/kafka/producer/ProducerTest.scala | 14 +-- 2 files changed, 4 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d24e10a/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index e252b01..923ceb7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -17,7 +17,6 @@ package kafka.admin -import kafka.cluster.Broker import kafka.log.LogConfig import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} import kafka.utils._ @@ -26,15 +25,10 @@ import java.util.Random import java.util.Properties import kafka.common.TopicAlreadyMarkedForDeletionException -import org.apache.kafka.common.Node -import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, LeaderNotAvailableException, ReplicaNotAvailableException, TopicExistsException, UnknownTopicOrPartitionException} -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.MetadataResponse +import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException} import scala.collection._ import scala.collection.JavaConverters._ -import mutable.ListBuffer import scala.collection.mutable import collection.Map import collection.Set @@ -667,98 +661,9 @@ object AdminUtils extends Logging with AdminUtilities { .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap } - def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata = - fetchTopicMetadataFromZk(topic, zkUtils, mutable.Map.empty[Int, Broker], - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - - def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] = - fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - - def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, securityProtocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] = - fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - - def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, listenerName: ListenerName): Set[MetadataResponse.TopicMetadata] = { - val cachedBrokerInfo = mutable.Map.empty[Int, Broker] - topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo, listenerName)) - } - - private def fetchTopicMetadataFromZk(topic: String, - zkUtils: ZkUtils, - cachedBrokerInfo: mutable.Map[Int, Broker], - listenerName: ListenerName): MetadataResponse.TopicMetadata = { - if (zkUtils.pathExists(getTopicPath(topic))) { - val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic))(topic) - val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) - val partitionMetadata = sortedPartitions.map { partitionMap => - val partition = partitionMap._1 - val replicas = partitionMap._2 - val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition) - val leader = zkUtils.getLeaderForPartition(topic, partition) - debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - - var leaderInfo: Node = Node.noNode() - var replicaInfo: Seq[Node] = Nil - var isrInfo: Seq[Node] = Nil - try { - leaderInfo = leader match { - case Some(l) => - try { - getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(listenerName) - } catch { - case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) - } - case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) - } - try { - replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(listenerName)) - isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(listenerName)) - } catch { - case e: Throwable => throw new ReplicaNotAvailableException(e) - } - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < inSyncReplicas.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava) - } catch { - case e: Throwable => - debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) - new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava) - } - } - new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava) - } else { - // topic doesn't exist, send appropriate error code - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList()) - } - } - - private def getBrokerInfoFromCache(zkUtils: ZkUtils, - cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker], - brokerIds: Seq[Int]): Seq[Broker] = { - var failedBrokerIds: ListBuffer[Int] = new ListBuffer() - val brokerMetadata = brokerIds.map { id => - val optionalBrokerInfo = cachedBrokerInfo.get(id) - optionalBrokerInfo match { - case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache - case None => // fetch it from zookeeper - zkUtils.getBrokerInfo(id) match { - case Some(brokerInfo) => - cachedBrokerInfo += (id -> brokerInfo) - Some(brokerInfo) - case None => - failedBrokerIds += id - None - } - } - } - brokerMetadata.filter(_.isDefined).map(_.get) - } - private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d24e10a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b2b9806..86a0e86 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -21,7 +21,6 @@ import java.nio.ByteBuffer import java.util import java.util.Properties -import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.common.FailedToSendMessageException import kafka.consumer.SimpleConsumer @@ -30,7 +29,6 @@ import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import kafka.utils._ import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.utils.Time import org.apache.log4j.{Level, Logger} @@ -340,16 +338,8 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ partitioner = classOf[StaticPartitioner].getName) try { - - // create topic - AdminUtils.createTopic(zkUtils, "new-topic", 2, 1) - TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).error != Errors.UNKNOWN_TOPIC_OR_PARTITION, - "Topic new-topic not created after timeout", - waitTime = zookeeper.tickTime) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0) - - producer.send(new KeyedMessage[String, String]("new-topic", "key", null)) + TestUtils.createTopic(zkUtils, "new-topic", 2, 1, servers) + producer.send(new KeyedMessage("new-topic", "key", null)) } finally { producer.close() }
