hachikuji commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r586632247
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) { + Map<String, ResultOrError<Uuid>> results = new HashMap<>(); + for (String name : names) { + if (name == null) { + results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name.")); + } else { + Uuid id = topicsByName.get(name, offset); + if (id == null) { + results.put(name, new ResultOrError<>( + new ApiError(UNKNOWN_TOPIC_OR_PARTITION))); + } else { + results.put(name, new ResultOrError<>(id)); + } + } + } + return results; + } + + Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) { + Map<Uuid, ResultOrError<String>> results = new HashMap<>(); + for (Uuid id : ids) { + TopicControlInfo topic = topics.get(id, offset); + if (topic == null) { + results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))); + } else { + results.put(id, new ResultOrError<>(topic.name)); + } + } + return results; + } + + ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) { + Map<Uuid, ApiError> results = new HashMap<>(); + List<ApiMessageAndVersion> records = new ArrayList<>(); + for (Uuid id : ids) { + try { + deleteTopic(id, records); + results.put(id, ApiError.NONE); + } catch (ApiException e) { + results.put(id, ApiError.fromThrowable(e)); + } catch (Exception e) { + log.error("Unexpected deleteTopics error for {}", id, e); + results.put(id, ApiError.fromThrowable(e)); + } + } + return new ControllerResult<>(records, results); + } + + void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) { + TopicControlInfo topic = topics.get(id); + if (topic == null) { + throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message()); + } + configurationControl.deleteTopicConfigs(topic.name); Review comment: Why do we need to do this both here and in `replay`? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) { + Map<String, ResultOrError<Uuid>> results = new HashMap<>(); Review comment: nit: I guess we could initialize the size (a couple similar cases below as well) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org