chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584123786



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
           throw new InvalidRequestException("Topic name and topic ID can not 
both be specified.")
         val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
-        else 
zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), 
null)
+        else 
zkSupport.controller.controllerContext.topicName(topic.topicId).orNull

Review comment:
       Do you want to narrow the access of `topicNames`? If so, we should 
change the modifier of `topicNames` from public to private

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = 
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && 
topic.name() == null
-         if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && 
topic.name() == null
+        if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && 
!authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name 
should
+          // not be returned in the response.
+          topic.setName(null)
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
       I prepared to discuss this error code on 
https://github.com/apache/kafka/pull/10184#discussion_r583933291 :)
   
   ### return `UNKNOWN_TOPIC_ID`
   
   Could you add clear error message to this response (call `setErrorMessage`)? 
I can imagine users get confusing for the error code `UNKNOWN_TOPIC_ID` because 
it presents two scenarios now (the id does not exist or you have no permission 
to describe topic). 
   
   ### return `TOPIC_AUTHORIZATION_FAILED`
   
   It indicates accurate error and it can help user handle un-authorized 
requests. Personally, this error is more suitable :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to