soarez commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1020949539


##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=ClusterAction",
-                                          s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the 
broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=Alter",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          
s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          
s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Write",
-                                          
s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--group=$group",
-                                          s"--consumer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--group=$group",
-                                          s"--operation=Read",
-                                          
s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--group=$wildcard",
-                                          s"--consumer",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def produceConsumePrefixedAclsArgs: Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topicPrefix",
-                                          s"--group=$groupPrefix",
-                                          s"--resource-pattern-type=prefixed",
-                                          s"--consumer",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-
   def ClusterActionAndClusterAlterAcls = Set(new 
AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, 
ALLOW),
     new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, 
ALLOW))
-  def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, WRITE, ALLOW))
   def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, DESCRIBE, ALLOW))
   def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, CREATE, ALLOW))
-  // The next two configuration parameters enable ZooKeeper secure ACLs
-  // and sets the Kafka authorizer, both necessary to enable security.
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
authorizerClass.getName)
+
+  def AclClusterAction = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", 
AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW))
+  def AclAlter = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, 
AclPermissionType.ALLOW))
+
+  def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
+  def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, 
AclPermissionType.ALLOW))
+  def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = 
new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,

Review Comment:
   s/tptopicresource/topicResource/g



##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=ClusterAction",
-                                          s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the 
broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--cluster",
-                                          s"--operation=Alter",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--operation=Read",
-                                          s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          
s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Describe",
-                                          
s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--remove",
-                                          s"--force",
-                                          s"--topic=$topic",
-                                          s"--operation=Write",
-                                          
s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topic",
-                                          s"--group=$group",
-                                          s"--consumer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--group=$group",
-                                          s"--operation=Read",
-                                          
s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$wildcard",
-                                          s"--group=$wildcard",
-                                          s"--consumer",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-  def produceConsumePrefixedAclsArgs: Array[String] = 
Array("--authorizer-properties",
-                                          s"zookeeper.connect=$zkConnect",
-                                          s"--add",
-                                          s"--topic=$topicPrefix",
-                                          s"--group=$groupPrefix",
-                                          s"--resource-pattern-type=prefixed",
-                                          s"--consumer",
-                                          s"--producer",
-                                          
s"--allow-principal=$clientPrincipal")
-
   def ClusterActionAndClusterAlterAcls = Set(new 
AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, 
ALLOW),
     new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, 
ALLOW))
-  def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, READ, ALLOW))
   def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, WRITE, ALLOW))
   def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, DESCRIBE, ALLOW))
   def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, 
WildcardHost, CREATE, ALLOW))
-  // The next two configuration parameters enable ZooKeeper secure ACLs
-  // and sets the Kafka authorizer, both necessary to enable security.
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
authorizerClass.getName)
+
+  def AclClusterAction = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", 
AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW))
+  def AclAlter = new AclBinding(clusterResource,
+    new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, 
AclPermissionType.ALLOW))
+
+  def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
+  def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, 
AclPermissionType.ALLOW))
+  def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = 
new AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new 
AclBinding(tptopicresource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+  def AclGroupRead = new AclBinding(groupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+
+  def AclWildcardTopicWrite = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
+  def AclWildcardTopicCreate = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, 
AclPermissionType.ALLOW))
+  def AclWildcardTopicDescribe = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclWildcardTopicRead = new AclBinding(wildcardTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+  def AclWildcardGroupRead = new AclBinding(wildcardGroupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+
+  def AclPrefixedTopicWrite = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
+  def AclPrefixedTopicCreate = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, 
AclPermissionType.ALLOW))
+  def AclPrefixedTopicDescribe = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+  def AclPrefixedTopicRead = new AclBinding(prefixedTopicResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+  def AclPrefixedGroupRead = new AclBinding(prefixedGroupResource,
+    new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+
+

Review Comment:
   Extra line



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -170,6 +187,19 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     admin
   }
 
+

Review Comment:
   Extra line



##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -292,24 +255,35 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   }
 
   private def setWildcardResourceAcls(): Unit = {
-    AclCommand.main(produceConsumeWildcardAclArgs)
+    val superuserAdminClient = createSuperuserAdminClient()
+    superuserAdminClient.createAcls(List(AclWildcardTopicWrite, 
AclWildcardTopicCreate, AclWildcardTopicDescribe, 
AclWildcardTopicRead).asJava).values
+    superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values
+
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, 
s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, 
wildcardTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
     }
   }
 
   private def setPrefixedResourceAcls(): Unit = {
-    AclCommand.main(produceConsumePrefixedAclsArgs)
+    val superuserAdminClient = createSuperuserAdminClient()
+    superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, 
AclPrefixedTopicCreate, AclPrefixedTopicDescribe, 
AclPrefixedTopicRead).asJava).values
+    superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values
+
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, 
prefixedTopicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
     }
   }
 
   private def setReadAndWriteAcls(tp: TopicPartition): Unit = {
-    AclCommand.main(produceAclArgs(tp.topic))
-    AclCommand.main(consumeAclArgs(tp.topic))
+    val tptopicresource = new ResourcePattern(TOPIC, tp.topic, LITERAL)
+    val superuserAdminClient = createSuperuserAdminClient()
+
+    superuserAdminClient.createAcls(List(AclTopicWrite(tptopicresource), 
AclTopicCreate(tptopicresource), 
AclTopicDescribe(tptopicresource)).asJava).values
+    
superuserAdminClient.createAcls(List(AclTopicRead(tptopicresource)).asJava).values

Review Comment:
   s/tptopicresource/topicResource/g



##########
core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala:
##########
@@ -76,6 +77,15 @@ class PlaintextEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest {
     super.setUp(testInfo)
   }
 
+  /*
+   * The principal used for all authenticated connections to listernerName is 
always clientPrincipal.

Review Comment:
   s/listernerName/listenerName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to