cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> nameToId) { + DeleteTopicsResponseData result = new DeleteTopicsResponseData(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (Entry<String, Uuid> entry : nameToId.entrySet()) { + ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records); + result.responses().add(new DeletableTopicResult(). + setName(entry.getKey()). + setTopicId(entry.getValue()). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + } + return new ControllerResult<>(records, result); + } + + ApiError deleteTopic(String name, + Uuid providedId, + List<ApiMessageAndVersion> records) { + Uuid realId = topicsByName.get(name); + if (realId == null) { + return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, + "Unable to locate the provided topic name."); + } + if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) { + return new ApiError(UNKNOWN_TOPIC_ID, + "The provided topic ID does not match the provided topic name."); + } + TopicControlInfo topic = topics.get(realId); + if (topic == null) { + return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id."); Review comment: the code has changed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org