jolshan commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r584162388
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + + request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else { + maybeAppendToIdsToResolve(topic.topicId()) + } + } else { + if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) + } else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + + val idToName = new util.HashMap[Uuid, String] + val unknownTopicNameErrors = new util.HashMap[String, ApiError] + def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } + } + controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue { + case (name, idOrError) => if (idOrError.isError) { + unknownTopicNameErrors.put(name, idOrError.error()) + } else { + maybeAppendToIdToName(idOrError.result(), name) + } + } + + /** + * There are 5 error cases to handle here: + * + * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION + * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED + * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID + */ + controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue { + case (id, nameOrError) => if (nameOrError.isError) { + // Case 5: we can't resolve the given topic ID to a name to feed to the + // Authorizer. Return UNKNOWN_TOPIC_ID. + appendResponse(null, id, nameOrError.error()) + } else { + maybeAppendToIdToName(id, nameOrError.result()) + } + } + if (!hasClusterAuth) { + val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala + val authorizedDescribeTopics = getDescribableTopics(allTopicNames) + val authorizedDeleteTopics = getDeletableTopics(allTopicNames) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val topicName = entry.getValue + if (!authorizedDeleteTopics.contains(topicName)) { + // Case 1 or case 4: the topic exists, but we don't have permission to delete it. + val topicId = entry.getKey + if (topicIdsToResolve.contains(topicId)) { + appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED)) Review comment: I think we need a case for describe vs. delete permissions right? Will a client know they have describe vs. delete permissions? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org