cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586739230



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> 
topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, 
Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, 
"Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, 
Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+        for (Uuid id : ids) {
+            TopicControlInfo topic = topics.get(id, offset);
+            if (topic == null) {
+                results.put(id, new ResultOrError<>(new 
ApiError(UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(id, new ResultOrError<>(topic.name));
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Uuid id : ids) {
+            try {
+                deleteTopic(id, records);
+                results.put(id, ApiError.NONE);
+            } catch (ApiException e) {
+                results.put(id, ApiError.fromThrowable(e));
+            } catch (Exception e) {
+                log.error("Unexpected deleteTopics error for {}", id, e);
+                results.put(id, ApiError.fromThrowable(e));
+            }
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+        TopicControlInfo topic = topics.get(id);
+        if (topic == null) {
+            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+        }
+        configurationControl.deleteTopicConfigs(topic.name);

Review comment:
       you're right, this should happen only in replay, not here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to