Repository: kafka
Updated Branches:
  refs/heads/trunk 9790d5aba -> 1d24e10ae


MINOR: Remove unused `AdminUtils.fetchTopicMetadataFromZk` methods

These are internal methods with no tests and since we now have an `AdminClient`,
we should remove them.

Author: Ismael Juma <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #3299 from ijuma/remove-unused-admin-utils-methods


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d24e10a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d24e10a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d24e10a

Branch: refs/heads/trunk
Commit: 1d24e10aeab616eede416201336e928b9a8efa98
Parents: 9790d5a
Author: Ismael Juma <[email protected]>
Authored: Tue Jun 20 11:29:11 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Tue Jun 20 11:29:11 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala | 99 +-------------------
 .../unit/kafka/producer/ProducerTest.scala      | 14 +--
 2 files changed, 4 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d24e10a/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index e252b01..923ceb7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -17,7 +17,6 @@
 
 package kafka.admin
 
-import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils._
@@ -26,15 +25,10 @@ import java.util.Random
 import java.util.Properties
 
 import kafka.common.TopicAlreadyMarkedForDeletionException
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.errors.{InvalidPartitionsException, 
InvalidReplicaAssignmentException, InvalidReplicationFactorException, 
InvalidTopicException, LeaderNotAvailableException, 
ReplicaNotAvailableException, TopicExistsException, 
UnknownTopicOrPartitionException}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.MetadataResponse
+import org.apache.kafka.common.errors.{InvalidPartitionsException, 
InvalidReplicaAssignmentException, InvalidReplicationFactorException, 
InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException}
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import mutable.ListBuffer
 import scala.collection.mutable
 import collection.Map
 import collection.Set
@@ -667,98 +661,9 @@ object AdminUtils extends Logging with AdminUtilities {
       .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, 
rootEntityType, entityPath))).toMap
   }
 
-  def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): 
MetadataResponse.TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkUtils, mutable.Map.empty[Int, Broker],
-      ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): 
Set[MetadataResponse.TopicMetadata] =
-    fetchTopicMetadataFromZk(topics, zkUtils, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, 
securityProtocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] =
-    fetchTopicMetadataFromZk(topics, zkUtils, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, 
listenerName: ListenerName): Set[MetadataResponse.TopicMetadata] = {
-    val cachedBrokerInfo = mutable.Map.empty[Int, Broker]
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, 
cachedBrokerInfo, listenerName))
-  }
-
-  private def fetchTopicMetadataFromZk(topic: String,
-                                       zkUtils: ZkUtils,
-                                       cachedBrokerInfo: mutable.Map[Int, 
Broker],
-                                       listenerName: ListenerName): 
MetadataResponse.TopicMetadata = {
-    if (zkUtils.pathExists(getTopicPath(topic))) {
-      val topicPartitionAssignment = 
zkUtils.getPartitionAssignmentForTopics(List(topic))(topic)
-      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) 
=> m1._1 < m2._1)
-      val partitionMetadata = sortedPartitions.map { partitionMap =>
-        val partition = partitionMap._1
-        val replicas = partitionMap._2
-        val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, 
partition)
-        val leader = zkUtils.getLeaderForPartition(topic, partition)
-        debug("replicas = " + replicas + ", in sync replicas = " + 
inSyncReplicas + ", leader = " + leader)
-
-        var leaderInfo: Node = Node.noNode()
-        var replicaInfo: Seq[Node] = Nil
-        var isrInfo: Seq[Node] = Nil
-        try {
-          leaderInfo = leader match {
-            case Some(l) =>
-              try {
-                getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
List(l)).head.getNode(listenerName)
-              } catch {
-                case e: Throwable => throw new 
LeaderNotAvailableException("Leader not available for partition 
[%s,%d]".format(topic, partition), e)
-              }
-            case None => throw new LeaderNotAvailableException("No leader 
exists for partition " + partition)
-          }
-          try {
-            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
replicas).map(_.getNode(listenerName))
-            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
inSyncReplicas).map(_.getNode(listenerName))
-          } catch {
-            case e: Throwable => throw new ReplicaNotAvailableException(e)
-          }
-          if (replicaInfo.size < replicas.size)
-            throw new ReplicaNotAvailableException("Replica information not 
available for following brokers: " +
-              
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-          if (isrInfo.size < inSyncReplicas.size)
-            throw new ReplicaNotAvailableException("In Sync Replica 
information not available for following brokers: " +
-              
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new MetadataResponse.PartitionMetadata(Errors.NONE, partition, 
leaderInfo, replicaInfo.asJava, isrInfo.asJava)
-        } catch {
-          case e: Throwable =>
-            debug("Error while fetching metadata for partition 
[%s,%d]".format(topic, partition), e)
-            new MetadataResponse.PartitionMetadata(Errors.forException(e), 
partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
-        }
-      }
-      new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
Topic.isInternal(topic), partitionMetadata.asJava)
-    } else {
-      // topic doesn't exist, send appropriate error code
-      new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
topic, Topic.isInternal(topic), java.util.Collections.emptyList())
-    }
-  }
-
-  private def getBrokerInfoFromCache(zkUtils: ZkUtils,
-                                     cachedBrokerInfo: 
scala.collection.mutable.Map[Int, Broker],
-                                     brokerIds: Seq[Int]): Seq[Broker] = {
-    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
-    val brokerMetadata = brokerIds.map { id =>
-      val optionalBrokerInfo = cachedBrokerInfo.get(id)
-      optionalBrokerInfo match {
-        case Some(brokerInfo) => Some(brokerInfo) // return broker info from 
the cache
-        case None => // fetch it from zookeeper
-          zkUtils.getBrokerInfo(id) match {
-            case Some(brokerInfo) =>
-              cachedBrokerInfo += (id -> brokerInfo)
-              Some(brokerInfo)
-            case None =>
-              failedBrokerIds += id
-              None
-          }
-      }
-    }
-    brokerMetadata.filter(_.isDefined).map(_.get)
-  }
-
   private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, 
replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d24e10a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index b2b9806..86a0e86 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
 import kafka.common.FailedToSendMessageException
 import kafka.consumer.SimpleConsumer
@@ -30,7 +29,6 @@ import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Time
 import org.apache.log4j.{Level, Logger}
@@ -340,16 +338,8 @@ class ProducerTest extends ZooKeeperTestHarness with 
Logging{
       partitioner = classOf[StaticPartitioner].getName)
 
     try {
-
-      // create topic
-      AdminUtils.createTopic(zkUtils, "new-topic", 2, 1)
-      TestUtils.waitUntilTrue(() =>
-        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).error != 
Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        "Topic new-topic not created after timeout",
-        waitTime = zookeeper.tickTime)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0)
-
-      producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
+      TestUtils.createTopic(zkUtils, "new-topic", 2, 1, servers)
+      producer.send(new KeyedMessage("new-topic", "key", null))
     } finally {
       producer.close()
     }

Reply via email to