chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r581058347
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val deleteTopicsRequest = request.body[DeleteTopicsRequest] + val nameToId = new mutable.HashMap[String, Uuid] + deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) + } + deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) + } + val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { + (nameToId.keySet, nameToId.keySet) + } else { + val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n) + val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized( + request.context, DELETE, TOPIC, nameToId.keys)(n => n) + (authorizedDescribeTopics, authorizedDeleteTopics) + } + } + def sendResponse(response: DeleteTopicsResponseData): Unit = { + nameToId.keysIterator.foreach { + name => if (!deletable.contains(name)) { + val result = if (describable.contains(name)) { + new DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code) + } else { + new DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code) + } + response.responses().add(result) + } + } + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + response.setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(response) + }) + } + val future = controller.deleteTopics( + nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava) Review comment: It can be simplified to `nameToId.view.filterKeys(deletable.contains).toMap.asJava)` ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val deleteTopicsRequest = request.body[DeleteTopicsRequest] + val nameToId = new mutable.HashMap[String, Uuid] + deleteTopicsRequest.data().topicNames().iterator().asScala.foreach { + name => nameToId.put(name, Uuid.ZERO_UUID) + } + deleteTopicsRequest.data().topics().iterator().asScala.foreach { + nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId()) + } + val (describable, deletable) = { + if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) { + (nameToId.keySet, nameToId.keySet) + } else { + val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized( Review comment: Is it possible the request carries only topic ids? If yes, how to find the related topic names and then authorize them here? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> nameToId) { + DeleteTopicsResponseData result = new DeleteTopicsResponseData(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (Entry<String, Uuid> entry : nameToId.entrySet()) { + ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records); + result.responses().add(new DeletableTopicResult(). + setName(entry.getKey()). + setTopicId(entry.getValue()). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + } + return new ControllerResult<>(records, result); + } + + ApiError deleteTopic(String name, + Uuid providedId, + List<ApiMessageAndVersion> records) { + Uuid realId = topicsByName.get(name); + if (realId == null) { + return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, + "Unable to locate the provided topic name."); + } + if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) { + return new ApiError(UNKNOWN_TOPIC_ID, + "The provided topic ID does not match the provided topic name."); + } + TopicControlInfo topic = topics.get(realId); + if (topic == null) { + return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id."); Review comment: Is this a server-side bug? the topic id exists in `topicsByName` but there is no `TopicControlInfo`. ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + if (!config.deleteTopicEnable) { + if (request.context.apiVersion() < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") Review comment: How about using `throw Errors.INVALID_REQUEST.exception()`? That makes sure the exception is correlated to expected `Errors` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org