chickenchickenlove commented on code in PR #21126:
URL: https://github.com/apache/kafka/pull/21126#discussion_r2652504754


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2117,13 +2117,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
     val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
 
+    // Separate topics with unknown topic IDs when using version 5+
+    val (knownTopics, unknownTopicIdTopics) = if 
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+      topics.partition { offsetForLeaderTopic =>
+        metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
+      }
+    } else {
+      (topics, Seq.empty[OffsetForLeaderTopic])
+    }
+
     // The OffsetsForLeaderEpoch API was initially only used for inter-broker 
communication and required
     // cluster permission. With KIP-320, the consumer now also uses this API 
to check for log truncation
     // following a leader change, so we also allow topic describe permission.
     val (authorizedTopics, unauthorizedTopics) =
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME, logIfDenied = false))
-        (topics, Seq.empty[OffsetForLeaderTopic])
-      else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, 
TOPIC, topics)(_.topic)
+        (knownTopics, Seq.empty[OffsetForLeaderTopic])
+      else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, 
TOPIC, knownTopics) { offsetForLeaderTopic =>
+        // Resolve topic name from topicId if needed for authorization
+        if 
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+          metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()

Review Comment:
   Should we consider race condition in terms of view update here?
   Because we check for existence first and retrieve the value later.
   
   ```scala
       // Separate topics with unknown topic IDs when using version 5+
       val (knownTopics, unknownTopicIdTopics) = if 
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
         topics.partition { offsetForLeaderTopic =>
           metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
         }
       } else {
         (topics, Seq.empty[OffsetForLeaderTopic])
       }
   ```
   
   Consider the following sequence:
   1. Thread A: Executes the partitioning logic. 
`metadataCache.getTopicName(id).isPresent()` returns true for UUID-123, so it's 
added to knownTopics.
   2. Thread B: Handles a metadata update (e.g., topic deletion via Admin API). 
It removes UUID-123 from `metadataCache`.
   3. Thread A: Proceeds to the authorization block and calls 
`metadataCache.getTopicName(id).get()`.
   
   As a result of this operation, `NoSuchElementException` will be thrown 
because the topic no longer exists in the cache, leading to an uncaught 
exception.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to