chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r584139113
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + + request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else { + maybeAppendToIdsToResolve(topic.topicId()) + } + } else { + if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) + } else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + + val idToName = new util.HashMap[Uuid, String] + val unknownTopicNameErrors = new util.HashMap[String, ApiError] + def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } + } + controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue { + case (name, idOrError) => if (idOrError.isError) { + unknownTopicNameErrors.put(name, idOrError.error()) + } else { + maybeAppendToIdToName(idOrError.result(), name) + } + } + + /** + * There are 5 error cases to handle here: + * + * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION + * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED + * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID + */ + controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue { + case (id, nameOrError) => if (nameOrError.isError) { + // Case 5: we can't resolve the given topic ID to a name to feed to the + // Authorizer. Return UNKNOWN_TOPIC_ID. + appendResponse(null, id, nameOrError.error()) + } else { + maybeAppendToIdToName(id, nameOrError.result()) + } + } + if (!hasClusterAuth) { + val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala + val authorizedDescribeTopics = getDescribableTopics(allTopicNames) + val authorizedDeleteTopics = getDeletableTopics(allTopicNames) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val topicName = entry.getValue + if (!authorizedDeleteTopics.contains(topicName)) { + // Case 1 or case 4: the topic exists, but we don't have permission to delete it. + val topicId = entry.getKey + if (topicIdsToResolve.contains(topicId)) { + appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED)) Review comment: Could you add comment to explain why `name` must be null for this case? ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + + request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else { + maybeAppendToIdsToResolve(topic.topicId()) + } + } else { + if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) + } else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + + val idToName = new util.HashMap[Uuid, String] + val unknownTopicNameErrors = new util.HashMap[String, ApiError] + def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } + } + controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue { + case (name, idOrError) => if (idOrError.isError) { + unknownTopicNameErrors.put(name, idOrError.error()) + } else { + maybeAppendToIdToName(idOrError.result(), name) + } + } + + /** + * There are 5 error cases to handle here: + * + * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION + * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED + * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID + */ + controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue { + case (id, nameOrError) => if (nameOrError.isError) { + // Case 5: we can't resolve the given topic ID to a name to feed to the + // Authorizer. Return UNKNOWN_TOPIC_ID. + appendResponse(null, id, nameOrError.error()) + } else { + maybeAppendToIdToName(id, nameOrError.result()) + } + } + if (!hasClusterAuth) { + val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala + val authorizedDescribeTopics = getDescribableTopics(allTopicNames) + val authorizedDeleteTopics = getDeletableTopics(allTopicNames) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val topicName = entry.getValue + if (!authorizedDeleteTopics.contains(topicName)) { + // Case 1 or case 4: the topic exists, but we don't have permission to delete it. + val topicId = entry.getKey + if (topicIdsToResolve.contains(topicId)) { + appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED)) + } else { + appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED)) + } + iterator.remove() + } + } + unknownTopicNameErrors.asScala.forKeyValue { + case (topicName, error) => + if (authorizedDescribeTopics.contains(topicName)) { + // Case 2: the topic we tried to delete by name doesn't exist, and we have + // permission to know that. + appendResponse(topicName, ZERO_UUID, error) Review comment: this method should be called again if `hasClusterAuth` is true. ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + + request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else { + maybeAppendToIdsToResolve(topic.topicId()) + } + } else { + if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) + } else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + + val idToName = new util.HashMap[Uuid, String] + val unknownTopicNameErrors = new util.HashMap[String, ApiError] + def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } + } + controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue { + case (name, idOrError) => if (idOrError.isError) { + unknownTopicNameErrors.put(name, idOrError.error()) + } else { + maybeAppendToIdToName(idOrError.result(), name) + } + } + + /** + * There are 5 error cases to handle here: + * + * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION + * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED + * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID + */ + controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue { + case (id, nameOrError) => if (nameOrError.isError) { + // Case 5: we can't resolve the given topic ID to a name to feed to the + // Authorizer. Return UNKNOWN_TOPIC_ID. + appendResponse(null, id, nameOrError.error()) Review comment: Should it pass `UNKNOWN_TOPIC_ID` for this case (according to comment)? ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + + request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else { + maybeAppendToIdsToResolve(topic.topicId()) + } + } else { + if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) + } else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + + val idToName = new util.HashMap[Uuid, String] + val unknownTopicNameErrors = new util.HashMap[String, ApiError] + def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } + } + controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue { + case (name, idOrError) => if (idOrError.isError) { + unknownTopicNameErrors.put(name, idOrError.error()) + } else { + maybeAppendToIdToName(idOrError.result(), name) + } + } + + /** + * There are 5 error cases to handle here: + * + * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION + * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED + * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED + * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID + */ + controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue { + case (id, nameOrError) => if (nameOrError.isError) { + // Case 5: we can't resolve the given topic ID to a name to feed to the + // Authorizer. Return UNKNOWN_TOPIC_ID. + appendResponse(null, id, nameOrError.error()) + } else { + maybeAppendToIdToName(id, nameOrError.result()) + } + } + if (!hasClusterAuth) { + val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala + val authorizedDescribeTopics = getDescribableTopics(allTopicNames) Review comment: How about creating this variable only if `unknownTopicNameErrors` is not empty. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org