chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613807365



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of 
describeable
-    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for 
it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       > For some reason leaving out the braces doesn't work here.
   
   Pardon me. Did you mean it can't be compiled? The following code can compile 
on my local (and `ControllerApisTest` pass)
   ```scala
       controller.findTopicNames(providedIds).thenCompose { topicNames =>
         topicNames.forEach { (id, nameOrError) =>
           if (nameOrError.isError) {
             appendResponse(null, id, nameOrError.error())
           } else {
             toAuthenticate.add(nameOrError.result())
             idToName.put(id, nameOrError.result())
           }
         }
         // Get the list of deletable topics (those we can delete) and the list 
of describeable
         // topics.
         val topicsToAuthenticate = toAuthenticate.asScala
         val (describeable, deletable) = if (hasClusterAuth) {
           (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
         } else {
           (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
         }
         // For each topic that was provided by ID, check if authentication 
failed.
         // If so, remove it from the idToName map and create an error response 
for it.
         val iterator = idToName.entrySet().iterator()
         while (iterator.hasNext) {
           val entry = iterator.next()
           val id = entry.getKey
           val name = entry.getValue
           if (!deletable.contains(name)) {
             if (describeable.contains(name)) {
               appendResponse(name, id, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
             } else {
               appendResponse(null, id, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
             }
             iterator.remove()
           }
         }
         // For each topic that was provided by name, check if authentication 
failed.
         // If so, create an error response for it. Otherwise, add it to the 
idToName map.
         controller.findTopicIds(providedNames).thenCompose { topicIds =>
           topicIds.forEach { (name, idOrError) =>
             if (!describeable.contains(name)) {
               appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
             } else if (idOrError.isError) {
               appendResponse(name, ZERO_UUID, idOrError.error)
             } else if (deletable.contains(name)) {
               val id = idOrError.result()
               if (duplicateProvidedIds.contains(id) || idToName.put(id, name) 
!= null) {
                 // This is kind of a weird case: what if we supply topic ID X 
and also a name
                 // that maps to ID X?  In that case, _if authorization 
succeeds_, we end up
                 // here.  If authorization doesn't succeed, we refrain from 
commenting on the
                 // situation since it would reveal topic ID mappings.
                 duplicateProvidedIds.add(id)
                 idToName.remove(id)
                 appendResponse(name, id, new ApiError(INVALID_REQUEST,
                   "The provided topic name maps to an ID that was already 
supplied."))
               }
             } else {
               appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
             }
           }
           // Finally, the idToName map contains all the topics that we are 
authorized to delete.
           // Perform the deletion and create responses for each one.
           controller.deleteTopics(idToName.keySet).thenApply { idToError =>
             idToError.forEach { (id, error) =>
               appendResponse(idToName.get(id), id, error)
             }
             // Shuffle the responses so that users can not use patterns in 
their positions to
             // distinguish between absent topics and topics we are not 
permitted to see.
             Collections.shuffle(responses)
             responses
           }
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to