dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r715684131



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -672,29 +672,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
     val fetchRequest = request.body[FetchRequest]
-    val (topicIds, topicNames) =
+    val topicNames =
       if (fetchRequest.version() >= 13)
-        metadataCache.topicIdInfo()
+        metadataCache.topicIdsToNames()
       else
-        (Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, 
String]())
+        Collections.emptyMap[Uuid, String]()
 
-    // If fetchData or forgottenTopics contain an unknown topic ID, return a 
top level error.
-    var fetchData: util.Map[TopicPartition, FetchRequest.PartitionData] = null
-    var forgottenTopics: util.List[TopicPartition] = null
-    try {
-      fetchData = fetchRequest.fetchData(topicNames)
-      forgottenTopics = fetchRequest.forgottenTopics(topicNames)
-    } catch {
-      case e: UnknownTopicIdException => throw e
-    }
+    val fetchData = fetchRequest.fetchData(topicNames)
+    val forgottenTopics = fetchRequest.forgottenTopics(topicNames)

Review comment:
       When a session is used, resolving the topic ids is not really necessary 
here because we should already have the names in the session or we would 
resolve them later anyway. I wonder if it would be better to do this entirely 
in the `fetchManager.newConext` based on the context type. Have you considered 
something like this?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     val sessionTopicIds = mutable.Map[String, Uuid]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)

Review comment:
       Do we still need this `sessionTopicIds` mapping if we have the topic id 
in the `topicIdPartition`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     val sessionTopicIds = mutable.Map[String, Uuid]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.UNKNOWN_TOPIC_ID)
+          else if (!metadataCache.contains(topicIdPartition.topicPartition))
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
-            interesting += (topicPartition -> data)
+            interesting += (topicIdPartition -> data)
         }
       } else {
-        fetchContext.foreachPartition { (part, topicId, _) =>
-          sessionTopicIds.put(part.topic(), topicId)
-          erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        fetchContext.foreachPartition { (topicIdPartition, _) =>
+          sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
         }
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they 
are fetching.
-      val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]
-      fetchContext.foreachPartition { (topicPartition, topicId, partitionData) 
=>
-        partitionDatas += topicPartition -> partitionData
-        sessionTopicIds.put(topicPartition.topic(), topicId)
-      }
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, partitionDatas)(_._1.topic)
-      partitionDatas.foreach { case (topicPartition, data) =>
-        if (!authorizedTopics.contains(topicPartition.topic))
-          erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
-        else if (!metadataCache.contains(topicPartition))
-          erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]
+      fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
+        partitionDatas += topicIdPartition -> partitionData
+        sessionTopicIds.put(topicIdPartition.topicPartition.topic(), 
topicIdPartition.topicId)

Review comment:
       Could we direclty check if the topic name is null here and put the 
unresolved ones to `erroneous`? This would avoid the filter on the next line.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -233,7 +233,8 @@ class ReplicaFetcherThread(name: String,
         Map.empty
       }
     } else {
-      fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala
+      fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, 
clientResponse.requestHeader().apiVersion()).asScala.map {

Review comment:
       Not related to this line. Don't wee need to update the fetcher to handle 
the topic id errors at the partition level?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,7 +173,8 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else

Review comment:
       Could we add a scaladoc for this method which explains what we do and 
why?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -378,53 +378,47 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param reqMetadata        The request metadata.
   * @param fetchData          The partition data from the fetch request.
   * @param usesTopicIds       True if this session should use topic IDs.
-  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
-                       private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val fetchData: util.Map[TopicIdPartition, 
FetchRequest.PartitionData],
                        private val usesTopicIds: Boolean,
-                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
-  override def getFetchOffset(part: TopicPartition): Option[Long] =
+  override def getFetchOffset(part: TopicIdPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
+  override def foreachPartition(fun: (TopicIdPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
     var hasInconsistentTopicIds = false
-    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
+    def createNewSession: FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
-      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
         if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) {
           info(s"Session encountered an inconsistent topic ID for 
topicPartition $part.")
           hasInconsistentTopicIds = true
         }
         val reqData = fetchData.get(part)
-        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
-        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
-        if (id != Uuid.ZERO_UUID)
-          sessionTopicIds.put(part.topic, id)
+        cachedPartitions.mustAdd(new CachedPartition(part.topicPartition, 
part.topicId, reqData, respData))
       }
-      (cachedPartitions, sessionTopicIds)
+      cachedPartitions
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
         updates.size, usesTopicIds, () => createNewSession)
     if (hasInconsistentTopicIds) {
-      FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new 
FetchSession.RESP_MAP, Collections.emptyMap())
+      FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new 
FetchSession.RESP_MAP)

Review comment:
       Do we still need to return `INCONSISTENT_TOPIC_ID` a top level error? 
Fetcher prior to this change would need it, for sure. With this PR, we actually 
don't want the fetcher to treat it as a top level error but rather as a 
partition error. We need to think/discuss this a little more, I think.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +247,40 @@ class FetchSession(val id: Int,
 
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
-  def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new CachedPartition(topicPartition,
-      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+  def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = 
synchronized {
+    Option(partitionMap.find(new 
CachedPartition(topicIdPartition.topicPartition, 
topicIdPartition.topicId))).map(_.fetchOffset)
   }
 
-  type TL = util.ArrayList[TopicPartition]
+  type TL = util.ArrayList[TopicIdPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
+             toForget: util.List[TopicIdPartition],
              reqMetadata: JFetchMetadata,
-             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
+             usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
-      // If the topic already existed, check that its ID is consistent.
-      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
-      val newCachedPart = new CachedPartition(topicPart, id, reqData)
-      if (id != Uuid.ZERO_UUID) {
-        val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, 
id)
-        if (prevSessionTopicId != null && prevSessionTopicId != id)
-          inconsistentTopicIds.add(topicPart)
-      }
+      val newCachedPart = new CachedPartition(topicPart.topicPartition, 
topicPart.topicId, reqData)
       val cachedPart = partitionMap.find(newCachedPart)
       if (cachedPart == null) {
         partitionMap.mustAdd(newCachedPart)
         added.add(topicPart)
       } else {
         cachedPart.updateRequestParams(reqData)
+        if (cachedPart.topic == null)
+        // Update the topic name in place
+          cachedPart.resolveUnknownName(topicPart.topicPartition.topic)

Review comment:
       This might not be necessary if we won't resolve topic ids in the request 
in all cases (see my previous comment).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     val sessionTopicIds = mutable.Map[String, Uuid]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          sessionTopicIds.put(topicIdPartition.topicPartition.topic, 
topicIdPartition.topicId)
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, 
Errors.UNKNOWN_TOPIC_ID)

Review comment:
       nit: Could we add an overload to `partitionResponse` which takes a 
`TopicIdPartition`? This would reduce the boiler plate code a bit here. 

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -173,8 +184,8 @@ class CachedPartition(val topic: String,
         this.eq(that) ||
           (that.canEqual(this) &&
             this.partition.equals(that.partition) &&
-            this.topic.equals(that.topic) &&
-            this.topicId.equals(that.topicId))
+            (if (this.topicId != Uuid.ZERO_UUID) 
this.topicId.equals(that.topicId)
+            else this.topic.equals(that.topic)))

Review comment:
       Side note here: I think that we should implement `override def 
elementKeysAreEqual(that: Any): Boolean` from the 
`ImplicitLinkedHashCollection.Element` interface to make it clear that we do 
this for comparing elements in the collections.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -870,12 +868,14 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // Prepare fetch response from converted data
         val response =
-          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava)
+          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData)
         // record the bytes out metrics only when the response is being sent
         response.data().responses().forEach { topicResponse =>
           topicResponse.partitions().forEach { data =>
-            val tp = new TopicPartition(topicResponse.topic(), 
data.partitionIndex())
-            brokerTopicStats.updateBytesOut(tp.topic, 
fetchRequest.isFromFollower, reassigningPartitions.contains(tp), 
FetchResponse.recordsSize(data))
+            val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic(), data.partitionIndex()))
+            // If the topic name was not known, we will have no bytes out.
+            if (tp.topicPartition.topic != null)

Review comment:
       Should we create `tp` after this check? We could also create a 
`TopicPartition` as we don't really use `TopicIdPartition` for the metric.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to