Repository: kafka Updated Branches: refs/heads/trunk 267952460 -> f9642e2a9
KAFKA-3052; Broker properties get logged twice if acl enabled Fix it by making it possible to pass the `doLog` parameter to `AbstractConfig`. As explained in the code comments, this means that we can continue to benefit from ZK default settings as specified in `KafkaConfig` without having to duplicate code. Also: * Removed unused private methods from `KafkaConfig` * Removed `case` modifier from `KafkaConfig` so that `hashCode`, `equals` and `toString` from `AbstractConfig` are used. * Made `props` a `val` and added `apply` method to `KafkaConfig` to remain binary compatible. * Call authorizer.close even if an exception is thrown during `configure`. Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang Closes #725 from ijuma/kafka-3052-broker-properties-get-logged-twice-if-acl-enabled Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9642e2a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9642e2a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9642e2a Branch: refs/heads/trunk Commit: f9642e2a9878faff81366dbc885a206727bd7c7b Parents: 2679524 Author: Ismael Juma <[email protected]> Authored: Mon Jan 4 08:51:30 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Jan 4 08:51:30 2016 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AclCommand.scala | 6 ++- .../security/auth/SimpleAclAuthorizer.scala | 15 ++++--- .../main/scala/kafka/server/KafkaConfig.scala | 45 ++++++-------------- .../scala/unit/kafka/admin/AclCommandTest.scala | 2 +- 4 files changed, 28 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 505be5a..841b278 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -67,8 +67,10 @@ object AclCommand { val authorizerClass = opts.options.valueOf(opts.authorizerOpt) val authZ = CoreUtils.createObject[Authorizer](authorizerClass) - authZ.configure(authorizerProperties.asJava) - try f(authZ) + try { + authZ.configure(authorizerProperties.asJava) + f(authZ) + } finally CoreUtils.swallow(authZ.close()) } http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index d0d226c..780bdf3 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -80,18 +80,21 @@ class SimpleAclAuthorizer extends Authorizer with Logging { override def configure(javaConfigs: util.Map[String, _]) { val configs = javaConfigs.asScala val props = new java.util.Properties() - configs foreach { case (key, value) => props.put(key, value.toString) } - val kafkaConfig = KafkaConfig.fromProps(props) + configs.foreach { case (key, value) => props.put(key, value.toString) } superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect { case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet }.getOrElse(Set.empty[KafkaPrincipal]) - shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false) + shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) - val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, kafkaConfig.zkConnect).toString - val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt - val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt + // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this + // means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also + // set). + val kafkaConfig = KafkaConfig.fromProps(props, doLog = false) + val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect) + val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs) + val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs) zkUtils = ZkUtils(zkUrl, zkConnectionTimeoutMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 856742f..9556799 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,7 +17,6 @@ package kafka.server -import java.util import java.util.Properties import kafka.api.ApiVersion @@ -32,7 +31,6 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.security.auth.PrincipalBuilder import scala.collection.{Map, immutable} @@ -687,19 +685,27 @@ object KafkaConfig { require(names.contains(name), "Unknown configuration \"%s\".".format(name)) } - def fromProps(props: Properties): KafkaConfig = { - KafkaConfig(props) - } + def fromProps(props: Properties): KafkaConfig = + fromProps(props, true) + + def fromProps(props: Properties, doLog: Boolean): KafkaConfig = + new KafkaConfig(props, doLog) - def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = { + def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = + fromProps(defaults, overrides, true) + + def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = { val props = new Properties() props.putAll(defaults) props.putAll(overrides) - fromProps(props) + fromProps(props, doLog) } + + def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true) + } -case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) { +class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) { /** ********* Zookeeper Configuration ***********/ val zkConnect: String = getString(KafkaConfig.ZkConnectProp) @@ -916,29 +922,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka } } - private def getMetricClasses(metricClasses: java.util.List[String]): java.util.List[MetricsReporter] = { - - val reporterList = new util.ArrayList[MetricsReporter]() - val iterator = metricClasses.iterator() - - while (iterator.hasNext) { - val reporterName = iterator.next() - if (!reporterName.isEmpty) { - val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName) - reporter.configure(originals) - reporterList.add(reporter) - } - } - - reporterList - - } - - - private def getPrincipalBuilderClass(principalBuilderClass: String): PrincipalBuilder = { - CoreUtils.createObject[PrincipalBuilder](principalBuilderClass) - } - validateValues() private def validateValues() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 65393d8..9802811 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -131,7 +131,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { } def withAuthorizer(props: Properties)(f: Authorizer => Unit) { - val kafkaConfig = KafkaConfig.fromProps(props) + val kafkaConfig = KafkaConfig.fromProps(props, doLog = false) val authZ = new SimpleAclAuthorizer try { authZ.configure(kafkaConfig.originals)
