chia7712 commented on a change in pull request #10505: URL: https://github.com/apache/kafka/pull/10505#discussion_r613807365
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel, val toAuthenticate = new util.HashSet[String] toAuthenticate.addAll(providedNames) val idToName = new util.HashMap[Uuid, String] - controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => - if (nameOrError.isError) { - appendResponse(null, id, nameOrError.error()) - } else { - toAuthenticate.add(nameOrError.result()) - idToName.put(id, nameOrError.result()) - } - } - // Get the list of deletable topics (those we can delete) and the list of describeable - // topics. If a topic can't be deleted or described, we have to act like it doesn't - // exist, even when it does. - val topicsToAuthenticate = toAuthenticate.asScala - val (describeable, deletable) = if (hasClusterAuth) { - (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) - } else { - (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) - } - // For each topic that was provided by ID, check if authentication failed. - // If so, remove it from the idToName map and create an error response for it. - val iterator = idToName.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - val id = entry.getKey - val name = entry.getValue - if (!deletable.contains(name)) { - if (describeable.contains(name)) { - appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) + controller.findTopicNames(providedIds).thenCompose(topicNames => { Review comment: > For some reason leaving out the braces doesn't work here. Pardon me. Did you mean it can't be compiled? The following code can compile on my local (and `ControllerApisTest` pass) ```scala controller.findTopicNames(providedIds).thenCompose { topicNames => topicNames.forEach { (id, nameOrError) => if (nameOrError.isError) { appendResponse(null, id, nameOrError.error()) } else { toAuthenticate.add(nameOrError.result()) idToName.put(id, nameOrError.result()) } } // Get the list of deletable topics (those we can delete) and the list of describeable // topics. val topicsToAuthenticate = toAuthenticate.asScala val (describeable, deletable) = if (hasClusterAuth) { (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) } else { (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) } // For each topic that was provided by ID, check if authentication failed. // If so, remove it from the idToName map and create an error response for it. val iterator = idToName.entrySet().iterator() while (iterator.hasNext) { val entry = iterator.next() val id = entry.getKey val name = entry.getValue if (!deletable.contains(name)) { if (describeable.contains(name)) { appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) } else { appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) } iterator.remove() } } // For each topic that was provided by name, check if authentication failed. // If so, create an error response for it. Otherwise, add it to the idToName map. controller.findTopicIds(providedNames).thenCompose { topicIds => topicIds.forEach { (name, idOrError) => if (!describeable.contains(name)) { appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED)) } else if (idOrError.isError) { appendResponse(name, ZERO_UUID, idOrError.error) } else if (deletable.contains(name)) { val id = idOrError.result() if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) { // This is kind of a weird case: what if we supply topic ID X and also a name // that maps to ID X? In that case, _if authorization succeeds_, we end up // here. If authorization doesn't succeed, we refrain from commenting on the // situation since it would reveal topic ID mappings. duplicateProvidedIds.add(id) idToName.remove(id) appendResponse(name, id, new ApiError(INVALID_REQUEST, "The provided topic name maps to an ID that was already supplied.")) } } else { appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED)) } } // Finally, the idToName map contains all the topics that we are authorized to delete. // Perform the deletion and create responses for each one. controller.deleteTopics(idToName.keySet).thenApply { idToError => idToError.forEach { (id, error) => appendResponse(idToName.get(id), id, error) } // Shuffle the responses so that users can not use patterns in their positions to // distinguish between absent topics and topics we are not permitted to see. Collections.shuffle(responses) responses } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org