mimaison commented on code in PR #15377:
URL: https://github.com/apache/kafka/pull/15377#discussion_r1560828250


##########
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##########
@@ -258,4 +275,21 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
     assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
     metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {
+    val config = new Properties()
+    config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000")
+    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
+    val client = createAdminClient(configOverrides = config)
+    adminClients += client

Review Comment:
   The call on the line above already adds the client to `adminClients` from a 
super class so this is not needed. Same for `adminClients.foreach(_.close())` 
in`tearDown()`



##########
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##########
@@ -74,16 +82,33 @@ object SslAdminIntegrationTest {
       futures.asJava
     }
   }
+
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
+    private val Pattern = "O=A (.*?),CN=(.*?)".r
+
+    // Use fields from DN as server principal to grant authorisation for 
servers and super admin client
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      val peerPrincipal = 
context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
+      peerPrincipal match {
+        case Pattern(name, cn) =>
+          val principal = if ((name == "server") || (cn == superuserCn)) 
"server" else KafkaPrincipal.ANONYMOUS.getName
+          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
 }
 
 class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
-  override val authorizationAdmin = new 
AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], 
classOf[AclAuthorizer])
-
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
+  override val aclAuthorizerClassName: String = 
classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
 
+  this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required")
+  
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
 classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
   override protected def securityProtocol = SecurityProtocol.SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))

Review Comment:
   I think this override can be deleted, the super class is already setting it 
to the same value.



##########
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##########
@@ -74,16 +82,33 @@ object SslAdminIntegrationTest {
       futures.asJava
     }
   }
+
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
+    private val Pattern = "O=A (.*?),CN=(.*?)".r
+
+    // Use fields from DN as server principal to grant authorisation for 
servers and super admin client
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      val peerPrincipal = 
context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
+      peerPrincipal match {
+        case Pattern(name, cn) =>
+          val principal = if ((name == "server") || (cn == superuserCn)) 
"server" else KafkaPrincipal.ANONYMOUS.getName
+          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
 }
 
 class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
-  override val authorizationAdmin = new 
AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], 
classOf[AclAuthorizer])
-
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
+  override val aclAuthorizerClassName: String = 
classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
 
+  this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required")
+  
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
 classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
   override protected def securityProtocol = SecurityProtocol.SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
   private val adminClients = mutable.Buffer.empty[Admin]

Review Comment:
   This can be removed too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to