Repository: kafka Updated Branches: refs/heads/trunk e11946b09 -> b905d4891
KAFKA-2949; Make EndToEndAuthorizationTest replicated. Author: Flavio Junqueira <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #631 from fpj/KAFKA-2949 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b905d489 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b905d489 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b905d489 Branch: refs/heads/trunk Commit: b905d489188768ba1c55226857db9713b9272918 Parents: e11946b Author: Flavio Junqueira <[email protected]> Authored: Sun Jan 3 15:25:59 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Sun Jan 3 15:25:59 2016 -0800 ---------------------------------------------------------------------- .../kafka/api/EndToEndAuthorizationTest.scala | 40 +++++++++++++++----- .../kafka/api/IntegrationTestHarness.scala | 3 +- 2 files changed, 31 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 59cff14..f14149f 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -65,12 +65,15 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { override val serverCount = 3 override val setClusterAcl = Some(() => { AclCommand.main(clusterAclArgs) - TestUtils.waitAndVerifyAcls(ClusterActionAcl, servers.head.apis.authorizer.get, clusterResource) + servers.foreach( s => + TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource) + ) } : Unit ) val numRecords = 1 val group = "group" val topic = "e2etopic" + val topicWildcard = "*" val part = 0 val tp = new TopicPartition(topic, part) val topicAndPartition = new TopicAndPartition(topic, part) @@ -93,6 +96,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { s"--cluster", s"--operation=ClusterAction", s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topicWildcard", + s"--operation=Read", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") def produceAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -106,13 +115,14 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { s"--group=$group", s"--consumer", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", + def groupAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--group=$group", s"--operation=Read", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def ClusterActionAcl:Set[Acl] = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) + def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) + def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) @@ -124,7 +134,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { // Some needed configuration for brokers, producers, and consumers this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") - this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") + this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") /** @@ -139,8 +149,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { startSasl(Both) } super.setUp + AclCommand.main(topicBrokerReadAclArgs) + servers.foreach( s => + TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) + ) // create the test topic with all the brokers as replicas - TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers) + TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } /** @@ -159,8 +173,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { def testProduceConsume { AclCommand.main(produceAclArgs) AclCommand.main(consumeAclArgs) - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + }) //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -195,8 +211,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { def testNoConsumeAcl { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + }) //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -218,7 +236,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { @Test def testNoGroupAcl { AclCommand.main(produceAclArgs) - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) + servers.foreach(s => + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + ) //Produce records debug("Starting to send records") sendRecords(numRecords, tp) http://git-wip-us.apache.org/repos/asf/kafka/blob/b905d489/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 7aaa185..b4f31c4 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -63,8 +63,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerConfig.putAll(consumerSecurityProps) for (i <- 0 until producerCount) - producers += TestUtils.createNewProducer(brokerList, - acks = 1, + producers += TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, props = Some(producerConfig))
