hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586632247



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> 
topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, 
Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, 
"Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, 
Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+        for (Uuid id : ids) {
+            TopicControlInfo topic = topics.get(id, offset);
+            if (topic == null) {
+                results.put(id, new ResultOrError<>(new 
ApiError(UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(id, new ResultOrError<>(topic.name));
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Uuid id : ids) {
+            try {
+                deleteTopic(id, records);
+                results.put(id, ApiError.NONE);
+            } catch (ApiException e) {
+                results.put(id, ApiError.fromThrowable(e));
+            } catch (Exception e) {
+                log.error("Unexpected deleteTopics error for {}", id, e);
+                results.put(id, ApiError.fromThrowable(e));
+            }
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+        TopicControlInfo topic = topics.get(id);
+        if (topic == null) {
+            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+        }
+        configurationControl.deleteTopicConfigs(topic.name);

Review comment:
       Why do we need to do this both here and in `replay`?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> 
topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, 
Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();

Review comment:
       nit: I guess we could initialize the size (a couple similar cases below 
as well)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to