soarez commented on code in PR #12843: URL: https://github.com/apache/kafka/pull/12843#discussion_r1020949539
########## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ########## @@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def clientPrincipal: KafkaPrincipal def kafkaPrincipal: KafkaPrincipal - // Arguments to AclCommand to set ACLs. - def clusterActionArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipal") - // necessary to create SCRAM credentials via the admin client using the broker's credentials - // without this we would need to create the SCRAM credentials via ZooKeeper - def clusterAlterArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Alter", - s"--allow-principal=$kafkaPrincipal") - def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipal") - def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--producer", - s"--allow-principal=$clientPrincipal") - def describeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$clientPrincipal") - def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--group=$group", - s"--operation=Read", - s"--allow-principal=$clientPrincipal") - def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topicPrefix", - s"--group=$groupPrefix", - s"--resource-pattern-type=prefixed", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - def ClusterActionAndClusterAlterAcls = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW), new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, ALLOW)) - def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW)) def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW)) def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW)) def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW)) - // The next two configuration parameters enable ZooKeeper secure ACLs - // and sets the Kafka authorizer, both necessary to enable security. - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") - this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) + + def AclClusterAction = new AclBinding(clusterResource, + new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW)) + def AclAlter = new AclBinding(clusterResource, + new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, AclPermissionType.ALLOW)) + + def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, Review Comment: s/tptopicresource/topicResource/g ########## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ########## @@ -96,96 +90,53 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def clientPrincipal: KafkaPrincipal def kafkaPrincipal: KafkaPrincipal - // Arguments to AclCommand to set ACLs. - def clusterActionArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipal") - // necessary to create SCRAM credentials via the admin client using the broker's credentials - // without this we would need to create the SCRAM credentials via ZooKeeper - def clusterAlterArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Alter", - s"--allow-principal=$kafkaPrincipal") - def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipal") - def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--producer", - s"--allow-principal=$clientPrincipal") - def describeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$clientPrincipal") - def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--group=$group", - s"--operation=Read", - s"--allow-principal=$clientPrincipal") - def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topicPrefix", - s"--group=$groupPrefix", - s"--resource-pattern-type=prefixed", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - def ClusterActionAndClusterAlterAcls = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW), new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, ALLOW)) - def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW)) def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW)) def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW)) def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW)) - // The next two configuration parameters enable ZooKeeper secure ACLs - // and sets the Kafka authorizer, both necessary to enable security. - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") - this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) + + def AclClusterAction = new AclBinding(clusterResource, + new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW)) + def AclAlter = new AclBinding(clusterResource, + new AccessControlEntry(kafkaPrincipal.toString, "*", AclOperation.ALTER, AclPermissionType.ALLOW)) + + def AclTopicWrite(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclTopicCreate(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclTopicDescribe(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclTopicRead(tptopicresource : ResourcePattern = topicResource) = new AclBinding(tptopicresource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclGroupRead = new AclBinding(groupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + + def AclWildcardTopicWrite = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclWildcardTopicCreate = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclWildcardTopicDescribe = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclWildcardTopicRead = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclWildcardGroupRead = new AclBinding(wildcardGroupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + + def AclPrefixedTopicWrite = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclPrefixedTopicCreate = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclPrefixedTopicDescribe = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclPrefixedTopicRead = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclPrefixedGroupRead = new AclBinding(prefixedGroupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + + Review Comment: Extra line ########## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ########## @@ -170,6 +187,19 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { admin } + Review Comment: Extra line ########## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ########## @@ -292,24 +255,35 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def setWildcardResourceAcls(): Unit = { - AclCommand.main(produceConsumeWildcardAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values + superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values + servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource) } } private def setPrefixedResourceAcls(): Unit = { - AclCommand.main(produceConsumePrefixedAclsArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values + superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource) } } private def setReadAndWriteAcls(tp: TopicPartition): Unit = { - AclCommand.main(produceAclArgs(tp.topic)) - AclCommand.main(consumeAclArgs(tp.topic)) + val tptopicresource = new ResourcePattern(TOPIC, tp.topic, LITERAL) + val superuserAdminClient = createSuperuserAdminClient() + + superuserAdminClient.createAcls(List(AclTopicWrite(tptopicresource), AclTopicCreate(tptopicresource), AclTopicDescribe(tptopicresource)).asJava).values + superuserAdminClient.createAcls(List(AclTopicRead(tptopicresource)).asJava).values Review Comment: s/tptopicresource/topicResource/g ########## core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala: ########## @@ -76,6 +77,15 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest { super.setUp(testInfo) } + /* + * The principal used for all authenticated connections to listernerName is always clientPrincipal. Review Comment: s/listernerName/listenerName -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org