chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r581058347



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = 
authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {
+        name => if (!deletable.contains(name)) {
+          val result = if (describable.contains(name)) {
+            new 
DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
+          } else {
+            new 
DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code)
+          }
+          response.responses().add(result)
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        response.setThrottleTimeMs(throttleTimeMs)
+        new DeleteTopicsResponse(response)
+      })
+    }
+   val future = controller.deleteTopics(
+     nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava)

Review comment:
       It can be simplified to 
`nameToId.view.filterKeys(deletable.contains).toMap.asJava)`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(

Review comment:
       Is it possible the request carries only topic ids? If yes, how to find 
the related topic names and then authorize them here?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> 
topicErrors,
         return configChanges;
     }
 
+    ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> 
nameToId) {
+        DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Uuid> entry : nameToId.entrySet()) {
+            ApiError error = deleteTopic(entry.getKey(), entry.getValue(), 
records);
+            result.responses().add(new DeletableTopicResult().
+                setName(entry.getKey()).
+                setTopicId(entry.getValue()).
+                setErrorCode(error.error().code()).
+                setErrorMessage(error.message()));
+        }
+        return new ControllerResult<>(records, result);
+    }
+
+    ApiError deleteTopic(String name,
+                         Uuid providedId,
+                         List<ApiMessageAndVersion> records) {
+        Uuid realId = topicsByName.get(name);
+        if (realId == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+                "Unable to locate the provided topic name.");
+        }
+        if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+            return new ApiError(UNKNOWN_TOPIC_ID,
+                "The provided topic ID does not match the provided topic 
name.");
+        }
+        TopicControlInfo topic = topics.get(realId);
+        if (topic == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate 
topic id.");

Review comment:
       Is this a server-side bug? the topic id exists in `topicsByName` but 
there is no `TopicControlInfo`.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
       How about using `throw Errors.INVALID_REQUEST.exception()`? That makes 
sure the exception is correlated to expected `Errors`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to