jolshan commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r569897756
########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -157,102 +178,86 @@ class MetadataCache(brokerId: Int) extends Logging { * * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. */ - private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] = { - snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName)) + private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = { + image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value())) } // errorUnavailableEndpoints exists to support v0 MetadataResponses def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { - val snapshot = metadataSnapshot + val image = _currentImage topics.toSeq.flatMap { topic => - getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => + getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => new MetadataResponseTopic() .setErrorCode(Errors.NONE.code) .setName(topic) - .setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID)) Review comment: Let me know if I missed it somewhere, but we will need the ID included in the response. I think we might need a MetadataImage method like `topicIdToName` but instead `topicNameToId` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org