mimaison commented on code in PR #15377: URL: https://github.com/apache/kafka/pull/15377#discussion_r1560828250
########## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ########## @@ -258,4 +275,21 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { + val config = new Properties() + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000") + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + val client = createAdminClient(configOverrides = config) + adminClients += client Review Comment: The call on the line above already adds the client to `adminClients` from a super class so this is not needed. Same for `adminClients.foreach(_.close())` in`tearDown()` ########## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ########## @@ -74,16 +82,33 @@ object SslAdminIntegrationTest { futures.asJava } } + + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { + private val Pattern = "O=A (.*?),CN=(.*?)".r + + // Use fields from DN as server principal to grant authorisation for servers and super admin client + override def build(context: AuthenticationContext): KafkaPrincipal = { + val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName + peerPrincipal match { + case Pattern(name, cn) => + val principal = if ((name == "server") || (cn == superuserCn)) "server" else KafkaPrincipal.ANONYMOUS.getName + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } } class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { - override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer]) - - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") + override val aclAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName + this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") + this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) override protected def securityProtocol = SecurityProtocol.SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) Review Comment: I think this override can be deleted, the super class is already setting it to the same value. ########## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ########## @@ -74,16 +82,33 @@ object SslAdminIntegrationTest { futures.asJava } } + + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { + private val Pattern = "O=A (.*?),CN=(.*?)".r + + // Use fields from DN as server principal to grant authorisation for servers and super admin client + override def build(context: AuthenticationContext): KafkaPrincipal = { + val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName + peerPrincipal match { + case Pattern(name, cn) => + val principal = if ((name == "server") || (cn == superuserCn)) "server" else KafkaPrincipal.ANONYMOUS.getName + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } } class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { - override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer]) - - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") + override val aclAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName + this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") + this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName) override protected def securityProtocol = SecurityProtocol.SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) private val adminClients = mutable.Buffer.empty[Admin] Review Comment: This can be removed too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org