Repository: kafka Updated Branches: refs/heads/0.9.0 2f51ab23a -> 81c89e91f
KAFKA-2863; Add a `close()` method to `Authorizer` Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #568 from ijuma/kafka-2863-authorizer-close (cherry picked from commit b1d17e7ef0e901234d95d7825f6862d2aaead76f) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81c89e91 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81c89e91 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81c89e91 Branch: refs/heads/0.9.0 Commit: 81c89e91f9dc5d6d5086d5b6dcd1eae2941718c2 Parents: 2f51ab2 Author: Ismael Juma <[email protected]> Authored: Fri Nov 20 10:43:25 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 20 10:43:33 2015 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AclCommand.scala | 70 +++++++++++--------- .../scala/kafka/security/auth/Authorizer.scala | 6 ++ .../security/auth/SimpleAclAuthorizer.scala | 6 +- .../main/scala/kafka/server/KafkaServer.scala | 10 +-- .../scala/unit/kafka/admin/AclCommandTest.scala | 21 ++++-- .../security/auth/SimpleAclAuthorizerTest.scala | 7 +- 6 files changed, 73 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 6ec0cf8..505be5a 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -58,7 +58,7 @@ object AclCommand { } } - def getAuthorizer(opts: AclCommandOptions): Authorizer = { + def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) { var authorizerProperties = Map.empty[String, Any] if (opts.options.has(opts.authorizerPropertiesOpt)) { val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("=")) @@ -66,55 +66,59 @@ object AclCommand { } val authorizerClass = opts.options.valueOf(opts.authorizerOpt) - val authZ: Authorizer = CoreUtils.createObject(authorizerClass) + val authZ = CoreUtils.createObject[Authorizer](authorizerClass) authZ.configure(authorizerProperties.asJava) - authZ + try f(authZ) + finally CoreUtils.swallow(authZ.close()) } private def addAcl(opts: AclCommandOptions) { - val authZ: Authorizer = getAuthorizer(opts) - val resourceToAcl = getResourceToAcls(opts) + withAuthorizer(opts) { authorizer => + val resourceToAcl = getResourceToAcls(opts) - if (resourceToAcl.values.exists(_.isEmpty)) - CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.") + if (resourceToAcl.values.exists(_.isEmpty)) + CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.") - for ((resource, acls) <- resourceToAcl) { - val acls = resourceToAcl(resource) - println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") - authZ.addAcls(acls, resource) - } + for ((resource, acls) <- resourceToAcl) { + val acls = resourceToAcl(resource) + println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + authorizer.addAcls(acls, resource) + } - listAcl(opts) + listAcl(opts) + } } private def removeAcl(opts: AclCommandOptions) { - val authZ: Authorizer = getAuthorizer(opts) - val resourceToAcl = getResourceToAcls(opts) - - for ((resource, acls) <- resourceToAcl) { - if (acls.isEmpty) { - if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?")) - authZ.removeAcls(resource) - } else { - if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?")) - authZ.removeAcls(acls, resource) + withAuthorizer(opts) { authorizer => + val resourceToAcl = getResourceToAcls(opts) + + for ((resource, acls) <- resourceToAcl) { + if (acls.isEmpty) { + if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?")) + authorizer.removeAcls(resource) + } else { + if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?")) + authorizer.removeAcls(acls, resource) + } } - } - listAcl(opts) + listAcl(opts) + } } private def listAcl(opts: AclCommandOptions) { - val authZ = getAuthorizer(opts) - val resources = getResource(opts, dieIfNoResourceFound = false) + withAuthorizer(opts) { authorizer => + val resources = getResource(opts, dieIfNoResourceFound = false) - val resourceToAcls = if(resources.isEmpty) - authZ.getAcls() - else - resources.map(resource => (resource -> authZ.getAcls(resource))) + val resourceToAcls = if (resources.isEmpty) + authorizer.getAcls() + else + resources.map(resource => (resource -> authorizer.getAcls(resource))) - for ((resource, acls) <- resourceToAcls) - println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + for ((resource, acls) <- resourceToAcls) + println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + } } private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/main/scala/kafka/security/auth/Authorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala index 939ed12..4c708b2 100644 --- a/core/src/main/scala/kafka/security/auth/Authorizer.scala +++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala @@ -81,5 +81,11 @@ trait Authorizer extends Configurable { * gets the map of resource to acls for all resources. */ def getAcls(): Map[Resource, Set[Acl]] + + /** + * Closes this instance. + */ + def close(): Unit + } http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index cae8f2a..d0d226c 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -109,7 +109,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { - val principal: KafkaPrincipal = session.principal + val principal = session.principal val host = session.clientAddress.getHostAddress val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) @@ -226,6 +226,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging { aclCache.toMap } + def close() { + if (zkUtils != null) zkUtils.close() + } + private def loadCache() { var acls = Set.empty[Acl] val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e8ea204..9eedbe2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -109,6 +109,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val brokerState: BrokerState = new BrokerState var apis: KafkaApis = null + var authorizer: Option[Authorizer] = None var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null @@ -191,12 +192,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr consumerCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ - val authorizer: Option[Authorizer] = if (config.authorizerClassName != null && !config.authorizerClassName.isEmpty) { - val authZ: Authorizer = CoreUtils.createObject(config.authorizerClassName) + authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => + val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) - Option(authZ) - } else { - None + authZ } /* start processing requests */ @@ -533,6 +532,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr CoreUtils.swallow(kafkaScheduler.shutdown()) if(apis != null) CoreUtils.swallow(apis.close()) + CoreUtils.swallow(authorizer.foreach(_.close())) if(replicaManager != null) CoreUtils.swallow(replicaManager.shutdown()) if(logManager != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 0bb950d..65393d8 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -78,7 +78,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1) AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add") for (resource <- resources) { - TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource) + withAuthorizer(brokerProps) { authorizer => + TestUtils.waitAndVerifyAcls(acls, authorizer, resource) + } } testRemove(resources, resourceCmd, args, brokerProps) @@ -97,7 +99,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add") for ((resources, acls) <- resourcesToAcls) { for (resource <- resources) { - TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource) + withAuthorizer(brokerProps) { authorizer => + TestUtils.waitAndVerifyAcls(acls, authorizer, resource) + } } } testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps) @@ -108,7 +112,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { for (resource <- resources) { Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) { AclCommand.main(args ++ resourceCmd :+ "--remove") - TestUtils.waitAndVerifyAcls(Set.empty[Acl], getAuthorizer(brokerProps), resource) + withAuthorizer(brokerProps) { authorizer => + TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource) + } } } } @@ -124,11 +130,12 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString)) } - def getAuthorizer(props: Properties): Authorizer = { + def withAuthorizer(props: Properties)(f: Authorizer => Unit) { val kafkaConfig = KafkaConfig.fromProps(props) val authZ = new SimpleAclAuthorizer - authZ.configure(kafkaConfig.originals) - - authZ + try { + authZ.configure(kafkaConfig.originals) + f(authZ) + } finally authZ.close() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/81c89e91/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index a4f61df..efcf930 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -26,7 +26,7 @@ import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert._ -import org.junit.{Before, Test} +import org.junit.{After, Before, Test} class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { @@ -51,6 +51,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { resource = new Resource(Topic, UUID.randomUUID().toString) } + @After + override def tearDown(): Unit = { + simpleAclAuthorizer.close() + } + @Test def testTopicAcl() { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
