mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+    new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+    new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
       KAFKA-12299




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to