hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r606008081



##########
File path: 
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", 
InetAddress.getLocalHost,
+      userPrincipal, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, 
Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(

Review comment:
       Maybe I am missing it, but where is the validation? How do we know that 
the envelope includes the serialized principal? Maybe we could verify the call 
to `brokerToController.sendRequest`?

##########
File path: 
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##########
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        assertEquals(principal, userPrincipal)
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+
+    val requestContext = new RequestContext(requestHeader, "1", 
InetAddress.getLocalHost,
+      userPrincipal, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, 
Optional.of(principalSerde))
+
+    autoTopicCreationManager.createTopics(
+      Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
+  }
+
+  @Test
+  def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit 
= {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    val topicName = "topic"
+    topicsCollection.add(getNewTopic(topicName))
+    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+      .setApiKey(ApiKeys.CREATE_TOPICS.id)
+      .setMinVersion(0)
+      .setMaxVersion(0)
+    Mockito.when(brokerToController.controllerApiVersions())
+      
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    val requestHeader = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
+      "clientId", 0)
+
+    val requestContext = new RequestContext(requestHeader, "1", 
InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+
+    // Throw upon undefined principal serde when building the forward request
+    assertThrows(classOf[IllegalArgumentException], () => 
autoTopicCreationManager.createTopics(
+      Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)))
+  }
+
+  @Test
+  def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): 
Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator)
+
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection

Review comment:
       nit: some helpers would improve readability of these tests. It looks 
like the 3 new tests share a lot of this initialization logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to