cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612742563



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of 
describeable
-    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for 
it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication 
failed.
-    // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
-          // This is kind of a weird case: what if we supply topic ID X and 
also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we 
end up
-          // here.  If authorization doesn't succeed, we refrain from 
commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already 
supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list 
of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are 
authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their 
positions to
-    // distinguish between absent topics and topics we are not permitted to 
see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication 
failed.
+      // If so, remove it from the idToName map and create an error response 
for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication 
failed.
+      // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
+      controller.findTopicIds(providedNames).thenCompose(topicIds => {
+        topicIds.forEach { (name, idOrError) =>
+          if (!describeable.contains(name)) {
+            appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else if (idOrError.isError) {
+            appendResponse(name, ZERO_UUID, idOrError.error)
+          } else if (deletable.contains(name)) {
+            val id = idOrError.result()
+            if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
+              // This is kind of a weird case: what if we supply topic ID X 
and also a name
+              // that maps to ID X?  In that case, _if authorization 
succeeds_, we end up
+              // here.  If authorization doesn't succeed, we refrain from 
commenting on the
+              // situation since it would reveal topic ID mappings.
+              duplicateProvidedIds.add(id)
+              idToName.remove(id)
+              appendResponse(name, id, new ApiError(INVALID_REQUEST,
+                "The provided topic name maps to an ID that was already 
supplied."))
+            }
+          } else {
+            appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+        }
+        // Finally, the idToName map contains all the topics that we are 
authorized to delete.
+        // Perform the deletion and create responses for each one.
+        controller.deleteTopics(idToName.keySet).thenApply(idToError => {
+          idToError.forEach { (id, error) =>
+            appendResponse(idToName.get(id), id, error)
+          }
+          // Shuffle the responses so that users can not use patterns in their 
positions to
+          // distinguish between absent topics and topics we are not permitted 
to see.
+          Collections.shuffle(responses)
+          responses
+        })
+      })
+    })
   }
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
-    val responseData = createTopics(request.body[CreateTopicsRequest].data(),
+    val createTopicsRequest = request.body[CreateTopicsRequest]
+    val future = createTopics(createTopicsRequest.data(),
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(identity))
-    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
-      responseData.setThrottleTimeMs(throttleTimeMs)
-      new CreateTopicsResponse(responseData)
+    future.whenComplete((result, exception) => {
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        if (exception != null) {
+          createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
+        } else {
+          result.setThrottleTimeMs(throttleTimeMs)
+          new CreateTopicsResponse(result)
+        }
+      })
     })
   }
 
   def createTopics(request: CreateTopicsRequestData,
                    hasClusterAuth: Boolean,
-                   getCreatableTopics: Iterable[String] => Set[String]): 
CreateTopicsResponseData = {
+                   getCreatableTopics: Iterable[String] => Set[String])
+                   : CompletableFuture[CreateTopicsResponseData] = {

Review comment:
       It's kind of a long return type, so I'd prefer to have it on a separate 
line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to