chia7712 commented on code in PR #19232:
URL: https://github.com/apache/kafka/pull/19232#discussion_r2160432259


##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.utils.Logging
-import org.apache.kafka.common._
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.InvalidTopicException
-import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor, 
DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic}
-import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.MetadataResponse
-import org.apache.kafka.image.MetadataImage
-import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, 
MetadataCache, PartitionRegistration, Replicas}
-import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
-
-import java.util
-import java.util.concurrent.ThreadLocalRandom
-import java.util.function.{Predicate, Supplier}
-import java.util.stream.Collectors
-import java.util.Properties
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
-import scala.util.control.Breaks._
-
-
-class KRaftMetadataCache(
-  val brokerId: Int,
-  val kraftVersionSupplier: Supplier[KRaftVersion]
-) extends MetadataCache with Logging {
-  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
-
-  // This is the cache state. Every MetadataImage instance is immutable, and 
updates
-  // replace this value with a completely new one. This means reads (which are 
not under
-  // any lock) need to grab the value of this variable once, and retain that 
read copy for
-  // the duration of their operation. Multiple reads of this value risk 
getting different
-  // image values.
-  @volatile private var _currentImage: MetadataImage = MetadataImage.EMPTY
-
-  // This method is the main hotspot when it comes to the performance of 
metadata requests,
-  // we should be careful about adding additional logic here.
-  // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def maybeFilterAliveReplicas(image: MetadataImage,
-                                       brokers: Array[Int],
-                                       listenerName: ListenerName,
-                                       filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
-    if (!filterUnavailableEndpoints) {
-      Replicas.toList(brokers)
-    } else {
-      val res = new util.ArrayList[Integer](brokers.length)
-      for (brokerId <- brokers) {
-        Option(image.cluster().broker(brokerId)).foreach { b =>
-          if (!b.fenced() && b.listeners().containsKey(listenerName.value())) {
-            res.add(brokerId)
-          }
-        }
-      }
-      res
-    }
-  }
-
-  def currentImage(): MetadataImage = _currentImage
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
-  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
-  private def getPartitionMetadata(image: MetadataImage, topicName: String, 
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
-                                   errorUnavailableListeners: Boolean): 
Option[Iterator[MetadataResponsePartition]] = {
-    Option(image.topics().getTopic(topicName)) match {
-      case None => None
-      case Some(topic) => Some(topic.partitions().entrySet().asScala.map { 
entry =>
-        val partitionId = entry.getKey
-        val partition = entry.getValue
-        val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-          listenerName, errorUnavailableEndpoints)
-        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
-          errorUnavailableEndpoints)
-        val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-        val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-        maybeLeader match {
-          case None =>
-            val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
-              Errors.LEADER_NOT_AVAILABLE
-            } else {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
-                s"not found on leader ${partition.leader}")
-              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
-            }
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId)
-              .setLeaderId(MetadataResponse.NO_LEADER_ID)
-              .setLeaderEpoch(partition.leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-          case Some(leader) =>
-            val error = if (filteredReplicas.size < partition.replicas.length) 
{
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
-                s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else if (filteredIsr.size < partition.isr.length) {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
-                s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else {
-              Errors.NONE
-            }
-
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId)
-              .setLeaderId(leader.id())
-              .setLeaderEpoch(partition.leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-        }
-      }.iterator)
-    }
-  }
-
-  /**
-   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
-   * index that is not included in the result.
-   *
-   * @param image                       The metadata image
-   * @param topicName                   The name of the topic.
-   * @param listenerName                The listener name.
-   * @param startIndex                  The smallest index of the partitions 
to be included in the result.
-   *                                    
-   * @return                            A collection of topic partition 
metadata and next partition index (-1 means
-   *                                    no next partition).
-   */
-  private def getPartitionMetadataForDescribeTopicResponse(
-    image: MetadataImage,
-    topicName: String,
-    listenerName: ListenerName,
-    startIndex: Int,
-    maxCount: Int
-  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
-    Option(image.topics().getTopic(topicName)) match {
-      case None => (None, -1)
-      case Some(topic) => {
-        val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
-        val partitions = topic.partitions().keySet()
-        val upperIndex = topic.partitions().size().min(startIndex + maxCount)
-        val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
-        for (partitionId <- startIndex until upperIndex) {
-          topic.partitions().get(partitionId) match {
-            case partition : PartitionRegistration => {
-              val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-                listenerName, filterUnavailableEndpoints = false)
-              val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, filterUnavailableEndpoints = false)
-              val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-              val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-              maybeLeader match {
-                case None =>
-                  result.append(new DescribeTopicPartitionsResponsePartition()
-                    .setPartitionIndex(partitionId)
-                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
-                    .setLeaderEpoch(partition.leaderEpoch)
-                    .setReplicaNodes(filteredReplicas)
-                    .setIsrNodes(filteredIsr)
-                    .setOfflineReplicas(offlineReplicas)
-                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
-                case Some(leader) =>
-                  result.append(new DescribeTopicPartitionsResponsePartition()
-                    .setPartitionIndex(partitionId)
-                    .setLeaderId(leader.id())
-                    .setLeaderEpoch(partition.leaderEpoch)
-                    .setReplicaNodes(filteredReplicas)
-                    .setIsrNodes(filteredIsr)
-                    .setOfflineReplicas(offlineReplicas)
-                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
-              }
-            }
-            case _ => warn(s"The partition $partitionId does not exist for 
$topicName")
-          }
-        }
-        (Some(result.toList), nextIndex)
-      }
-    }
-  }
-
-  private def getOfflineReplicas(image: MetadataImage,
-                                 partition: PartitionRegistration,
-                                 listenerName: ListenerName): 
util.List[Integer] = {
-    val offlineReplicas = new util.ArrayList[Integer](0)
-    for (brokerId <- partition.replicas) {
-      Option(image.cluster().broker(brokerId)) match {
-        case None => offlineReplicas.add(brokerId)
-        case Some(broker) => if (isReplicaOffline(partition, listenerName, 
broker)) {
-          offlineReplicas.add(brokerId)
-        }
-      }
-    }
-    offlineReplicas
-  }
-
-  private def isReplicaOffline(partition: PartitionRegistration, listenerName: 
ListenerName, broker: BrokerRegistration) =
-    broker.fenced() || !broker.listeners().containsKey(listenerName.value()) 
|| isReplicaInOfflineDir(broker, partition)
-
-  private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: 
PartitionRegistration): Boolean =
-    !broker.hasOnlineDir(partition.directory(broker.id()))
-
-  /**
-   * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
-   * be added dynamically, so a broker with a missing listener could be a 
transient error.
-   *
-   * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
-   */
-  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: 
ListenerName): Option[Node] = {
-    
Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).toScala)
-  }
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  override def getTopicMetadata(topics: util.Set[String],
-                                listenerName: ListenerName,
-                                errorUnavailableEndpoints: Boolean = false,
-                                errorUnavailableListeners: Boolean = false): 
util.List[MetadataResponseTopic] = {
-    val image = _currentImage
-    topics.stream().flatMap(topic =>
-      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners) match {
-        case Some(partitionMetadata) =>
-          util.stream.Stream.of(new MetadataResponseTopic()
-            .setErrorCode(Errors.NONE.code)
-            .setName(topic)
-            
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
-            .setIsInternal(Topic.isInternal(topic))
-            .setPartitions(partitionMetadata.toBuffer.asJava))
-        case None => util.stream.Stream.empty()
-      }
-    ).collect(Collectors.toList())
-  }
-
-  override def describeTopicResponse(
-    topics: util.Iterator[String],
-    listenerName: ListenerName,
-    topicPartitionStartIndex: util.function.Function[String, Integer],
-    maximumNumberOfPartitions: Int,
-    ignoreTopicsWithExceptions: Boolean
-  ): DescribeTopicPartitionsResponseData = {
-    val image = _currentImage
-    var remaining = maximumNumberOfPartitions
-    val result = new DescribeTopicPartitionsResponseData()
-    breakable {
-      topics.forEachRemaining { topicName =>
-        if (remaining > 0) {
-          val (partitionResponse, nextPartition) =
-            getPartitionMetadataForDescribeTopicResponse(
-              image, topicName, listenerName, 
topicPartitionStartIndex(topicName), remaining
-            )
-          partitionResponse.map(partitions => {
-            val response = new DescribeTopicPartitionsResponseTopic()
-              .setErrorCode(Errors.NONE.code)
-              .setName(topicName)
-              
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
-              .setIsInternal(Topic.isInternal(topicName))
-              .setPartitions(partitions.asJava)
-            result.topics().add(response)
-
-            if (nextPartition != -1) {
-              result.setNextCursor(new Cursor()
-                .setTopicName(topicName)
-                .setPartitionIndex(nextPartition)
-              )
-              break()
-            }
-            remaining -= partitions.size
-          })
-
-          if (!ignoreTopicsWithExceptions && partitionResponse.isEmpty) {
-            val error = try {
-              Topic.validate(topicName)
-              Errors.UNKNOWN_TOPIC_OR_PARTITION
-            } catch {
-              case _: InvalidTopicException =>
-                Errors.INVALID_TOPIC_EXCEPTION
-            }
-            result.topics().add(new DescribeTopicPartitionsResponseTopic()
-              .setErrorCode(error.code())
-              .setName(topicName)
-              .setTopicId(getTopicId(topicName))
-              .setIsInternal(Topic.isInternal(topicName)))
-          }
-        } else if (remaining == 0) {
-          // The cursor should point to the beginning of the current topic. 
All the partitions in the previous topic
-          // should be fulfilled. Note that, if a partition is pointed in the 
NextTopicPartition, it does not mean
-          // this topic exists.
-          result.setNextCursor(new Cursor()
-            .setTopicName(topicName)
-            .setPartitionIndex(0))
-          break()
-        }
-      }
-    }
-    result
-  }
-
-  override def getAllTopics(): util.Set[String] = 
_currentImage.topics().topicsByName().keySet()
-
-  override def getTopicId(topicName: String): Uuid = 
util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName))
-    .map(_.id)
-    .orElse(Uuid.ZERO_UUID)
-
-  override def getTopicName(topicId: Uuid): util.Optional[String] = 
util.Optional.ofNullable(_currentImage.topics().topicsById().get(topicId)).map(t
 => t.name)
-
-  override def hasAliveBroker(brokerId: Int): Boolean = {
-    Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
-  }
-
-  override def isBrokerFenced(brokerId: Int): Boolean = {
-    Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
-  }
-
-  override def isBrokerShuttingDown(brokerId: Int): Boolean = {
-    
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
-  }
-
-  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
util.Optional[Node] = {
-    util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
-      .filter(Predicate.not(_.fenced))
-      .flatMap(broker => broker.node(listenerName.value))
-  }
-
-  override def getAliveBrokerNodes(listenerName: ListenerName): 
util.List[Node] = {
-    _currentImage.cluster.brokers.values.stream
-      .filter(Predicate.not(_.fenced))
-      .flatMap(broker => broker.node(listenerName.value).stream)
-      .collect(Collectors.toList())
-  }
-
-  override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = {
-    _currentImage.cluster.brokers.values.stream
-      .flatMap(broker => broker.node(listenerName.value).stream)
-      .collect(Collectors.toList())
-  }
-
-  override def getLeaderAndIsr(topicName: String, partitionId: Int): 
util.Optional[LeaderAndIsr] = {
-    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => 
util.Optional.ofNullable(topic.partitions().get(partitionId))).
-      flatMap(partition => util.Optional.ofNullable(new 
LeaderAndIsr(partition.leader, partition.leaderEpoch,
-        util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*), 
partition.leaderRecoveryState, partition.partitionEpoch)))
-  }
-
-  override def numPartitions(topicName: String): util.Optional[Integer] = {
-    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
-      map(topic => topic.partitions().size())
-  }
-
-  override def topicIdsToNames(): util.Map[Uuid, String] = 
_currentImage.topics.topicIdToNameView()
-
-  override def topicNamesToIds(): util.Map[String, Uuid] = 
_currentImage.topics().topicNameToIdView()
-
-  // if the leader is not known, return None;
-  // if the leader is known and corresponding node is available, return 
Some(node)
-  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)

Review Comment:
   please keep those comments



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> partitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> partitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = partitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());
+                Optional<List<DescribeTopicPartitionsResponsePartition>> 
partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                if (partitionResponse.isPresent()) {
+                    List<DescribeTopicPartitionsResponsePartition> partitions 
= partitionResponse.get();
+                    DescribeTopicPartitionsResponseTopic response = new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(Errors.NONE.code())
+                        .setName(topicName)
+                        
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+                        .setIsInternal(Topic.isInternal(topicName))
+                        .setPartitions(partitions);
+                    result.topics().add(response);
+
+                    if (nextPartition != -1) {
+                        result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+                        break;
+                    } else {
+                        remaining.addAndGet(-partitions.size());
+                    }
+                } else if (!ignoreTopicsWithExceptions) {
+                    Errors error;
+                    try {
+                        Topic.validate(topicName);
+                        error = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                    } catch (InvalidTopicException e) {
+                        error = Errors.INVALID_TOPIC_EXCEPTION;
+                    }
+                    result.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(error.code())
+                        .setName(topicName)
+                        .setTopicId(getTopicId(topicName))
+                        .setIsInternal(Topic.isInternal(topicName)));
+                }
+            } else if (remaining.get() == 0) {
+                // The cursor should point to the beginning of the current 
topic. All the partitions in the previous topic
+                // should be fulfilled. Note that, if a partition is pointed 
in the NextTopicPartition, it does not mean
+                // this topic exists.
+                result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(0));
+                break;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Set<String> getAllTopics() {
+        return currentImage.topics().topicsByName().keySet();
+    }
+
+    @Override
+    public Uuid getTopicId(String topicName) {
+        return currentImage.topics().getTopic(topicName) == null ? 
Uuid.ZERO_UUID : currentImage.topics().getTopic(topicName).id();
+    }
+
+    @Override
+    public Optional<String> getTopicName(Uuid topicId) {
+        return 
Optional.ofNullable(currentImage.topics().topicsById().get(topicId)).map(TopicImage::name);
+    }
+
+    @Override
+    public boolean hasAliveBroker(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
!currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerFenced(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerShuttingDown(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).inControlledShutdown();
+    }
+
+    @Override
+    public Optional<Node> getAliveBrokerNode(int brokerId, ListenerName 
listenerName) {
+        return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()));
+    }
+
+    @Override
+    public List<Node> getAliveBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Node> getBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<LeaderAndIsr> getLeaderAndIsr(String topicName, int 
partitionId) {
+        return Optional.ofNullable(currentImage.topics().getTopic(topicName))
+            .flatMap(topic -> 
Optional.ofNullable(topic.partitions().get(partitionId)))
+            .map(partition -> new LeaderAndIsr(
+                partition.leader,
+                partition.leaderEpoch,
+                
Arrays.stream(partition.isr).boxed().collect(Collectors.toList()),
+                partition.leaderRecoveryState,
+                partition.partitionEpoch
+            ));
+    }
+
+    @Override
+    public Optional<Integer> numPartitions(String topicName) {
+        return 
Optional.ofNullable(currentImage.topics().getTopic(topicName)).map(topic -> 
topic.partitions().size());
+    }
+
+    @Override
+    public Map<Uuid, String> topicIdsToNames() {
+        return currentImage.topics().topicIdToNameView();
+    }
+
+    @Override
+    public Map<String, Uuid> topicNamesToIds() {
+        return currentImage.topics().topicNameToIdView();
+    }
+
+    @Override
+    public Optional<Node> getPartitionLeaderEndpoint(String topicName, int 
partitionId, ListenerName listenerName) {
+        return Optional.ofNullable(currentImage.topics().getTopic(topicName))
+            .flatMap(topic -> 
Optional.ofNullable(topic.partitions().get(partitionId)))
+            .flatMap(partition -> 
Optional.ofNullable(currentImage.cluster().broker(partition.leader))
+                .map(broker -> 
broker.node(listenerName.value()).orElse(Node.noNode())));
+    }
+
+    @Override
+    public Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, 
ListenerName listenerName) {
+        TopicImage topic = currentImage.topics().getTopic(tp.topic());

Review Comment:
   ditto. please create a temporary reference for `currentImage`



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> partitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> partitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = partitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());
+                Optional<List<DescribeTopicPartitionsResponsePartition>> 
partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                if (partitionResponse.isPresent()) {
+                    List<DescribeTopicPartitionsResponsePartition> partitions 
= partitionResponse.get();
+                    DescribeTopicPartitionsResponseTopic response = new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(Errors.NONE.code())
+                        .setName(topicName)
+                        
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+                        .setIsInternal(Topic.isInternal(topicName))
+                        .setPartitions(partitions);
+                    result.topics().add(response);
+
+                    if (nextPartition != -1) {
+                        result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+                        break;
+                    } else {
+                        remaining.addAndGet(-partitions.size());
+                    }
+                } else if (!ignoreTopicsWithExceptions) {
+                    Errors error;
+                    try {
+                        Topic.validate(topicName);
+                        error = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                    } catch (InvalidTopicException e) {
+                        error = Errors.INVALID_TOPIC_EXCEPTION;
+                    }
+                    result.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(error.code())
+                        .setName(topicName)
+                        .setTopicId(getTopicId(topicName))
+                        .setIsInternal(Topic.isInternal(topicName)));
+                }
+            } else if (remaining.get() == 0) {
+                // The cursor should point to the beginning of the current 
topic. All the partitions in the previous topic
+                // should be fulfilled. Note that, if a partition is pointed 
in the NextTopicPartition, it does not mean
+                // this topic exists.
+                result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(0));
+                break;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Set<String> getAllTopics() {
+        return currentImage.topics().topicsByName().keySet();
+    }
+
+    @Override
+    public Uuid getTopicId(String topicName) {
+        return currentImage.topics().getTopic(topicName) == null ? 
Uuid.ZERO_UUID : currentImage.topics().getTopic(topicName).id();
+    }
+
+    @Override
+    public Optional<String> getTopicName(Uuid topicId) {
+        return 
Optional.ofNullable(currentImage.topics().topicsById().get(topicId)).map(TopicImage::name);
+    }
+
+    @Override
+    public boolean hasAliveBroker(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
!currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerFenced(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerShuttingDown(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).inControlledShutdown();
+    }
+
+    @Override
+    public Optional<Node> getAliveBrokerNode(int brokerId, ListenerName 
listenerName) {
+        return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()));
+    }
+
+    @Override
+    public List<Node> getAliveBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Node> getBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<LeaderAndIsr> getLeaderAndIsr(String topicName, int 
partitionId) {
+        return Optional.ofNullable(currentImage.topics().getTopic(topicName))
+            .flatMap(topic -> 
Optional.ofNullable(topic.partitions().get(partitionId)))
+            .map(partition -> new LeaderAndIsr(
+                partition.leader,
+                partition.leaderEpoch,
+                
Arrays.stream(partition.isr).boxed().collect(Collectors.toList()),
+                partition.leaderRecoveryState,
+                partition.partitionEpoch
+            ));
+    }
+
+    @Override
+    public Optional<Integer> numPartitions(String topicName) {
+        return 
Optional.ofNullable(currentImage.topics().getTopic(topicName)).map(topic -> 
topic.partitions().size());
+    }
+
+    @Override
+    public Map<Uuid, String> topicIdsToNames() {
+        return currentImage.topics().topicIdToNameView();
+    }
+
+    @Override
+    public Map<String, Uuid> topicNamesToIds() {
+        return currentImage.topics().topicNameToIdView();
+    }
+
+    @Override
+    public Optional<Node> getPartitionLeaderEndpoint(String topicName, int 
partitionId, ListenerName listenerName) {
+        return Optional.ofNullable(currentImage.topics().getTopic(topicName))

Review Comment:
   please create a temporary reference for `currentImage`. otherwise, it may 
cause concurrent issue. 



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> partitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> partitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = partitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());

Review Comment:
   ```java
                   var partitionResponseEntry = 
partitionMetadataForDescribeTopicResponse(image, topicName, listenerName, 
topicPartitionStartIndex.apply(topicName), remaining.get());
                   var partitionResponse = partitionResponseEntry.getKey();
   ```



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> partitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> partitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = partitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());
+                Optional<List<DescribeTopicPartitionsResponsePartition>> 
partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                if (partitionResponse.isPresent()) {
+                    List<DescribeTopicPartitionsResponsePartition> partitions 
= partitionResponse.get();
+                    DescribeTopicPartitionsResponseTopic response = new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(Errors.NONE.code())
+                        .setName(topicName)
+                        
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+                        .setIsInternal(Topic.isInternal(topicName))
+                        .setPartitions(partitions);
+                    result.topics().add(response);
+
+                    if (nextPartition != -1) {
+                        result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+                        break;
+                    } else {
+                        remaining.addAndGet(-partitions.size());
+                    }
+                } else if (!ignoreTopicsWithExceptions) {
+                    Errors error;
+                    try {
+                        Topic.validate(topicName);
+                        error = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                    } catch (InvalidTopicException e) {
+                        error = Errors.INVALID_TOPIC_EXCEPTION;
+                    }
+                    result.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(error.code())
+                        .setName(topicName)
+                        .setTopicId(getTopicId(topicName))
+                        .setIsInternal(Topic.isInternal(topicName)));
+                }
+            } else if (remaining.get() == 0) {
+                // The cursor should point to the beginning of the current 
topic. All the partitions in the previous topic
+                // should be fulfilled. Note that, if a partition is pointed 
in the NextTopicPartition, it does not mean
+                // this topic exists.
+                result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(0));
+                break;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Set<String> getAllTopics() {
+        return currentImage.topics().topicsByName().keySet();
+    }
+
+    @Override
+    public Uuid getTopicId(String topicName) {
+        return currentImage.topics().getTopic(topicName) == null ? 
Uuid.ZERO_UUID : currentImage.topics().getTopic(topicName).id();
+    }
+
+    @Override
+    public Optional<String> getTopicName(Uuid topicId) {
+        return 
Optional.ofNullable(currentImage.topics().topicsById().get(topicId)).map(TopicImage::name);
+    }
+
+    @Override
+    public boolean hasAliveBroker(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
!currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerFenced(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerShuttingDown(int brokerId) {
+        return currentImage.cluster().broker(brokerId) != null && 
currentImage.cluster().broker(brokerId).inControlledShutdown();

Review Comment:
   ditto. please check all usages of `currentImage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to