mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703
########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( + topics: collection.Set[String], + listenerName: ListenerName, + errorUnavailableEndpoints: Boolean = false, + errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { + new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { + new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: KAFKA-12299 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org