hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585027051



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
     assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), 
fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val controllerContext: ControllerContext = 
EasyMock.mock(classOf[ControllerContext])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+    
EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED
+    ))
+
+    val topicIdsMap = Map(
+      Uuid.randomUuid() -> Some("foo"),
+      Uuid.randomUuid() -> Some("bar"),
+      Uuid.randomUuid() -> None
+    )
+
+    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+      
EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+    }
+
+    val topicDatas = topicIdsMap.keys.map { topicId =>
+      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+    }.toList
+    val deleteRequest = new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData()
+      .setTopics(topicDatas.asJava))
+      .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, 
clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, controllerContext, 
authorizer)
+    createKafkaApis(authorizer = 
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = 
capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    topicIdsMap.foreach { case (topicId, nameOpt) =>
+      val response = deleteResponse.data.responses.asScala.find(_.topicId == 
topicId).get
+      nameOpt match {
+        case Some("foo") =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+        case Some("bar") =>
+          assertEquals("bar", response.name)
+          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
Errors.forCode(response.errorCode))
+        case None =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, 
Errors.forCode(response.errorCode))
+        case _ =>
+          fail("Unexpected topic id/name mapping")
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: 
Boolean): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    val deleteRequest = if (usePrimitiveTopicNameArray) {
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopicNames(List("foo", "bar", "baz").asJava))
+        .build(5.toShort)
+    } else {
+      val topicDatas = List(
+        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+      )
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopics(topicDatas.asJava))
+        .build(ApiKeys.DELETE_TOPICS.latestVersion)
+    }
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, 
clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, authorizer)
+    createKafkaApis(authorizer = 
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = 
capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    def lookupErrorCode(topic: String): Option[Errors] = {
+      Option(deleteResponse.data.responses().find(topic))
+        .map(result => Errors.forCode(result.errorCode))
+    }
+
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), 
lookupErrorCode("foo"))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), 
lookupErrorCode("bar"))
+    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), 
lookupErrorCode("baz"))
+  }
+
+  def expectTopicAuthorization(

Review comment:
       I looked at doing this, but the need to preserve order makes it just as 
cumbersome. The capture also provides some type safety and avoids the need for 
casting from the argument list. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to