Repository: kafka Updated Branches: refs/heads/0.10.0 9ef243310 -> ef676c15c
KAFKA-3475; Introduce our own `MiniKdc` This also fixes KAFKA-3453 and KAFKA-2866. Author: Ismael Juma <[email protected]> Reviewers: Gwen Shapira Closes #1155 from ijuma/kafka-3475-introduce-our-minikdc (cherry picked from commit 78d91dcd8805d850038df52718380a6f956abad7) Signed-off-by: Gwen Shapira <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef676c15 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef676c15 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef676c15 Branch: refs/heads/0.10.0 Commit: ef676c15c54840d4b8a8ecb54a879b86b2547ea1 Parents: 9ef2433 Author: Ismael Juma <[email protected]> Authored: Wed Mar 30 19:30:34 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Wed Mar 30 19:30:49 2016 -0700 ---------------------------------------------------------------------- build.gradle | 15 +- .../org/apache/kafka/common/utils/Utils.java | 2 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 15 +- core/src/test/resources/minikdc-krb5.conf | 25 ++ core/src/test/resources/minikdc.ldiff | 47 ++ .../scala/integration/kafka/api/SaslSetup.scala | 10 +- .../scala/kafka/security/minikdc/MiniKdc.scala | 433 +++++++++++++++++++ .../integration/KafkaServerTestHarness.scala | 12 +- gradle/dependencies.gradle | 12 +- tests/kafkatest/services/security/minikdc.py | 12 +- 10 files changed, 557 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 13a8b4e..d6f82a4 100644 --- a/build.gradle +++ b/build.gradle @@ -338,7 +338,17 @@ project(':core') { testCompile project(':clients').sourceSets.test.output testCompile libs.bcpkix testCompile libs.easymock - testCompile libs.hadoopMiniKdc + testCompile(libs.apacheda) { + exclude group: 'xml-apis', module: 'xml-apis' + } + testCompile libs.apachedsCoreApi + testCompile libs.apachedsInterceptorKerberos + testCompile libs.apachedsProtocolShared + testCompile libs.apachedsProtocolKerberos + testCompile libs.apachedsProtocolLdap + testCompile libs.apachedsLdifPartition + testCompile libs.apachedsMavibotPartition + testCompile libs.apachedsJdbmPartition testCompile libs.junit testCompile libs.scalaTest } @@ -368,6 +378,9 @@ project(':core') { duplicatesStrategy 'exclude' } + systemTestLibs { + dependsOn testJar + } task genProtocolErrorDocs(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 0167548..2a98822 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -476,7 +476,7 @@ public class Utils { * @param daemon Should the thread block JVM shutdown? * @return The unstarted thread */ - public static Thread newThread(String name, Runnable runnable, Boolean daemon) { + public static Thread newThread(String name, Runnable runnable, boolean daemon) { Thread thread = new Thread(runnable, name); thread.setDaemon(daemon); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index fe2bebf..5b6c59f 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -57,13 +57,14 @@ object CoreUtils extends Logging { } /** - * Create a daemon thread - * @param name The name of the thread - * @param fun The function to execute in the thread - * @return The unstarted thread - */ - def daemonThread(name: String, fun: => Unit): Thread = - Utils.daemonThread(name, runnable(fun)) + * Create a thread + * @param name The name of the thread + * @param daemon Whether the thread should block JVM shutdown + * @param fun The function to execute in the thread + * @return The unstarted thread + */ + def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread = + Utils.newThread(name, runnable(fun), daemon) /** * Do the given action and log any exceptions thrown without rethrowing them http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/test/resources/minikdc-krb5.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/minikdc-krb5.conf b/core/src/test/resources/minikdc-krb5.conf new file mode 100644 index 0000000..0603404 --- /dev/null +++ b/core/src/test/resources/minikdc-krb5.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[libdefaults] +default_realm = {0} +udp_preference_limit = 1 + +[realms] +{0} = '{' + kdc = {1}:{2} +'}' http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/test/resources/minikdc.ldiff ---------------------------------------------------------------------- diff --git a/core/src/test/resources/minikdc.ldiff b/core/src/test/resources/minikdc.ldiff new file mode 100644 index 0000000..75e4dfd --- /dev/null +++ b/core/src/test/resources/minikdc.ldiff @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +dn: ou=users,dc=${0},dc=${1} +objectClass: organizationalUnit +objectClass: top +ou: users + +dn: uid=krbtgt,ou=users,dc=${0},dc=${1} +objectClass: top +objectClass: person +objectClass: inetOrgPerson +objectClass: krb5principal +objectClass: krb5kdcentry +cn: KDC Service +sn: Service +uid: krbtgt +userPassword: secret +krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3} +krb5KeyVersionNumber: 0 + +dn: uid=ldap,ou=users,dc=${0},dc=${1} +objectClass: top +objectClass: person +objectClass: inetOrgPerson +objectClass: krb5principal +objectClass: krb5kdcentry +cn: LDAP +sn: Service +uid: ldap +userPassword: secret +krb5PrincipalName: ldap/${4}@${2}.${3} +krb5KeyVersionNumber: 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index c36b288..8255e6a 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -17,11 +17,11 @@ package kafka.api -import java.io.{File} +import java.io.File import javax.security.auth.login.Configuration -import kafka.utils.{JaasTestUtils,TestUtils} -import org.apache.hadoop.minikdc.MiniKdc +import kafka.security.minikdc.MiniKdc +import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.kerberos.LoginManager @@ -39,8 +39,8 @@ case object Both extends SaslSetupMode * currently to setup a keytab and jaas files. */ trait SaslSetup { - private val workDir = new File(System.getProperty("test.dir", "build/tmp/test-workDir")) - private val kdcConf = MiniKdc.createConf() + private val workDir = TestUtils.tempDir() + private val kdcConf = MiniKdc.createConfig private val kdc = new MiniKdc(kdcConf, workDir) def startSasl(mode: SaslSetupMode = Both) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala new file mode 100644 index 0000000..14807bc --- /dev/null +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.minikdc + +import java.io._ +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.text.MessageFormat +import java.util.{Locale, Properties, UUID} + +import kafka.utils.{CoreUtils, Logging} + +import scala.collection.JavaConverters._ +import org.apache.commons.io.IOUtils +import org.apache.commons.lang.text.StrSubstitutor +import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry} +import org.apache.directory.api.ldap.model.ldif.LdifReader +import org.apache.directory.api.ldap.model.name.Dn +import org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor +import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader +import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager +import org.apache.directory.server.constants.ServerDNConstants +import org.apache.directory.server.core.DefaultDirectoryService +import org.apache.directory.server.core.api.{CacheService, DirectoryService, InstanceLayout} +import org.apache.directory.server.core.api.schema.SchemaPartition +import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor +import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, JdbmPartition} +import org.apache.directory.server.core.partition.ldif.LdifPartition +import org.apache.directory.server.kerberos.KerberosConfig +import org.apache.directory.server.kerberos.kdc.KdcServer +import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory +import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} +import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} +import org.apache.directory.server.xdbm.Index +import org.apache.directory.shared.kerberos.KerberosTime +import org.apache.kafka.common.utils.Utils + +/** + * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone + * KDC. + * + * MiniKdc sets 2 System properties when started and unsets them when stopped: + * + * - java.security.krb5.conf: set to the MiniKDC real/host/port + * - sun.security.krb5.debug: set to the debug value provided in the configuration + * + * As a result of this, multiple MiniKdc instances should not be started concurrently in the same JVM. + * + * MiniKdc default configuration values are: + * + * - org.name=EXAMPLE (used to create the REALM) + * - org.domain=COM (used to create the REALM) + * - kdc.bind.address=localhost + * - kdc.port=0 (ephemeral port) + * - instance=DefaultKrbServer + * - max.ticket.lifetime=86400000 (1 day) + * - max.renewable.lifetime604800000 (7 days) + * - transport=TCP + * - debug=false + * + * The generated krb5.conf forces TCP connections. + * + * Acknowledgements: this class is derived from the MiniKdc class in the hadoop-minikdc project (git commit + * d8d8ed35f00b15ee0f2f8aaf3fe7f7b42141286b). + * + * @constructor creates a new MiniKdc instance. + * @param config the MiniKdc configuration + * @param workDir the working directory which will contain krb5.conf, Apache DS files and any other files needed by + * MiniKdc. + * @throws Exception thrown if the MiniKdc could not be created. + */ +class MiniKdc(config: Properties, workDir: File) extends Logging { + + if (!config.keySet.containsAll(MiniKdc.RequiredProperties.asJava)) { + val missingProperties = MiniKdc.RequiredProperties.filterNot(config.keySet.asScala) + throw new IllegalArgumentException(s"Missing configuration properties: $missingProperties") + } + + info("Configuration:") + info("---------------------------------------------------------------") + config.asScala.foreach { case (key, value) => + info(s"\t$key: $value") + } + info("---------------------------------------------------------------") + + private val orgName = config.getProperty(MiniKdc.OrgName) + private val orgDomain = config.getProperty(MiniKdc.OrgDomain) + private val dnString = s"dc=$orgName,dc=$orgDomain" + private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}" + private val krb5conf = new File(workDir, "krb5.conf") + + private var _port = config.getProperty(MiniKdc.KdcPort).toInt + private var ds: DirectoryService = null + private var kdc: KdcServer = null + + def port: Int = _port + + def host: String = config.getProperty(MiniKdc.KdcBindAddress) + + def start() { + if (kdc != null) + throw new RuntimeException("KDC already started") + initDirectoryService() + initKdcServer() + initJvmKerberosConfig() + } + + private def initDirectoryService() { + ds = new DefaultDirectoryService + ds.setInstanceLayout(new InstanceLayout(workDir)) + ds.setCacheService(new CacheService) + + // first load the schema + val instanceLayout = ds.getInstanceLayout + val schemaPartitionDirectory = new File(instanceLayout.getPartitionsDirectory, "schema") + val extractor = new DefaultSchemaLdifExtractor(instanceLayout.getPartitionsDirectory) + extractor.extractOrCopy + + val loader = new LdifSchemaLoader(schemaPartitionDirectory) + val schemaManager = new DefaultSchemaManager(loader) + schemaManager.loadAllEnabled() + ds.setSchemaManager(schemaManager) + // Init the LdifPartition with schema + val schemaLdifPartition = new LdifPartition(schemaManager, ds.getDnFactory) + schemaLdifPartition.setPartitionPath(schemaPartitionDirectory.toURI) + + // The schema partition + val schemaPartition = new SchemaPartition(schemaManager) + schemaPartition.setWrappedPartition(schemaLdifPartition) + ds.setSchemaPartition(schemaPartition) + + val systemPartition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory) + systemPartition.setId("system") + systemPartition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, systemPartition.getId).toURI) + systemPartition.setSuffixDn(new Dn(ServerDNConstants.SYSTEM_DN)) + systemPartition.setSchemaManager(ds.getSchemaManager) + ds.setSystemPartition(systemPartition) + + ds.getChangeLog.setEnabled(false) + ds.setDenormalizeOpAttrsEnabled(true) + ds.addLast(new KeyDerivationInterceptor) + + // create one partition + val orgName = config.getProperty(MiniKdc.OrgName).toLowerCase(Locale.ENGLISH) + val orgDomain = config.getProperty(MiniKdc.OrgDomain).toLowerCase(Locale.ENGLISH) + val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory) + partition.setId(orgName) + partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI) + val dn = new Dn(dnString) + partition.setSuffixDn(dn) + ds.addPartition(partition) + + // indexes + val indexedAttributes = Set[Index[_, String]]( + new JdbmIndex[Entry]("objectClass", false), + new JdbmIndex[Entry]("dc", false), + new JdbmIndex[Entry]("ou", false) + ).asJava + partition.setIndexedAttributes(indexedAttributes) + + // And start the ds + ds.setInstanceId(config.getProperty(MiniKdc.Instance)) + ds.startup() + + // context entry, after ds.startup() + val entry = ds.newEntry(dn) + entry.add("objectClass", "top", "domain") + entry.add("dc", orgName) + ds.getAdminSession.add(entry) + } + + private def initKdcServer() { + + def addInitialEntriesToDirectoryService(bindAddress: String) { + val map = Map ( + "0" -> orgName.toLowerCase(Locale.ENGLISH), + "1" -> orgDomain.toLowerCase(Locale.ENGLISH), + "2" -> orgName.toUpperCase(Locale.ENGLISH), + "3" -> orgDomain.toUpperCase(Locale.ENGLISH), + "4" -> bindAddress + ) + val inputStream = MiniKdc.getResourceAsStream("minikdc.ldiff") + try addEntriesToDirectoryService(StrSubstitutor.replace(IOUtils.toString(inputStream), map.asJava)) + finally CoreUtils.swallow(inputStream.close()) + } + + val bindAddress = config.getProperty(MiniKdc.KdcBindAddress) + addInitialEntriesToDirectoryService(bindAddress) + + val kerberosConfig = new KerberosConfig + kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong) + kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong) + kerberosConfig.setSearchBaseDn(dnString) + kerberosConfig.setPaEncTimestampRequired(false) + kdc = new KdcServer(kerberosConfig) + kdc.setDirectoryService(ds) + + // transport + val transport = config.getProperty(MiniKdc.Transport) + val absTransport = transport.trim match { + case "TCP" => new TcpTransport(bindAddress, port, 3, 50) + case "UDP" => new UdpTransport(port) + case _ => throw new IllegalArgumentException(s"Invalid transport: $transport") + } + kdc.addTransports(absTransport) + kdc.setServiceName(config.getProperty(MiniKdc.Instance)) + kdc.start() + + // if using ephemeral port, update port number for binding + if (port == 0) + _port = absTransport.getAcceptor.getLocalAddress.asInstanceOf[InetSocketAddress].getPort + + info(s"MiniKdc listening at port: $port") + } + + private def initJvmKerberosConfig(): Unit = { + writeKrb5Conf() + System.setProperty(MiniKdc.JavaSecurityKrb5Conf, krb5conf.getAbsolutePath) + System.setProperty(MiniKdc.SunSecurityKrb5Debug, config.getProperty(MiniKdc.Debug, "false")) + info(s"MiniKdc setting JVM krb5.conf to: ${krb5conf.getAbsolutePath}") + refreshJvmKerberosConfig() + } + + private def writeKrb5Conf() { + val stringBuilder = new StringBuilder + val reader = new BufferedReader( + new InputStreamReader(MiniKdc.getResourceAsStream("minikdc-krb5.conf"), StandardCharsets.UTF_8)) + try { + var line: String = null + while ({line = reader.readLine(); line != null}) { + stringBuilder.append(line).append("{3}") + } + } finally CoreUtils.swallow(reader.close()) + val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator()) + Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8)) + } + + private def refreshJvmKerberosConfig(): Unit = { + val klass = + if (System.getProperty("java.vendor").contains("IBM")) + Class.forName("com.ibm.security.krb5.internal.Config") + else + Class.forName("sun.security.krb5.Config") + klass.getMethod("refresh").invoke(klass) + } + + def stop() { + if (kdc != null) { + System.clearProperty(MiniKdc.JavaSecurityKrb5Conf) + System.clearProperty(MiniKdc.SunSecurityKrb5Debug) + kdc.stop() + try ds.shutdown() + catch { + case ex: Exception => error("Could not shutdown ApacheDS properly", ex) + } + } + } + + /** + * Creates a principal in the KDC with the specified user and password. + * + * An exception will be thrown if the principal cannot be created. + * + * @param principal principal name, do not include the domain. + * @param password password. + */ + private def createPrincipal(principal: String, password: String) { + val ldifContent = s""" + |dn: uid=$principal,ou=users,dc=${orgName.toLowerCase(Locale.ENGLISH)},dc=${orgDomain.toLowerCase(Locale.ENGLISH)} + |objectClass: top + |objectClass: person + |objectClass: inetOrgPerson + |objectClass: krb5principal + |objectClass: krb5kdcentry + |cn: $principal + |sn: $principal + |uid: $principal + |userPassword: $password + |krb5PrincipalName: ${principal}@${realm} + |krb5KeyVersionNumber: 0""".stripMargin + addEntriesToDirectoryService(ldifContent) + } + + /** + * Creates multiple principals in the KDC and adds them to a keytab file. + * + * An exception will be thrown if the principal cannot be created. + * + * @param keytabFile keytab file to add the created principals + * @param principals principals to add to the KDC, do not include the domain. + */ + def createPrincipal(keytabFile: File, principals: String*) { + val generatedPassword = UUID.randomUUID.toString + val keytab = new Keytab + val entries = principals.flatMap { principal => + createPrincipal(principal, generatedPassword) + val principalWithRealm = s"${principal}@${realm}" + val timestamp = new KerberosTime + KerberosKeyFactory.getKerberosKeys(principalWithRealm, generatedPassword).asScala.values.map { encryptionKey => + val keyVersion = encryptionKey.getKeyVersion.toByte + new KeytabEntry(principalWithRealm, 1, timestamp, keyVersion, encryptionKey) + } + } + keytab.setEntries(entries.asJava) + keytab.write(keytabFile) + } + + private def addEntriesToDirectoryService(ldifContent: String): Unit = { + val reader = new LdifReader(new StringReader(ldifContent)) + try { + for (ldifEntry <- reader.asScala) + ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry)) + } finally CoreUtils.swallow(reader.close()) + } + +} + +object MiniKdc { + + val JavaSecurityKrb5Conf = "java.security.krb5.conf" + val SunSecurityKrb5Debug = "sun.security.krb5.debug" + + def main(args: Array[String]) { + args match { + case Array(workDirPath, configPath, keytabPath, principals@ _*) if principals.nonEmpty => + val workDir = new File(workDirPath) + if (!workDir.exists) + throw new RuntimeException(s"Specified work directory does not exist: ${workDir.getAbsolutePath}") + val config = createConfig + val configFile = new File(configPath) + if (!configFile.exists) + throw new RuntimeException(s"Specified configuration does not exist: ${configFile.getAbsolutePath}") + + val userConfig = Utils.loadProps(configFile.getAbsolutePath) + userConfig.asScala.foreach { case (key, value) => + config.put(key, value) + } + val keytabFile = new File(keytabPath).getAbsoluteFile + start(workDir, config, keytabFile, principals) + case _ => + println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> <KEYTABFILE> [<PRINCIPALS>]+") + sys.exit(1) + } + } + + private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]) { + val miniKdc = new MiniKdc(config, workDir) + miniKdc.start() + miniKdc.createPrincipal(keytabFile, principals: _*) + val infoMessage = s""" + | + |Standalone MiniKdc Running + |--------------------------------------------------- + | Realm : ${miniKdc.realm} + | Running at : ${miniKdc.host}:${miniKdc.port} + | krb5conf : ${miniKdc.krb5conf} + | + | created keytab : $keytabFile + | with principals : ${principals.mkString(", ")} + | + |Hit <CTRL-C> or kill <PID> to stop it + |--------------------------------------------------- + | + """.stripMargin + println(infoMessage) + Runtime.getRuntime.addShutdownHook(CoreUtils.newThread("minikdc-shutdown-hook", daemon = false) { + miniKdc.stop() + }) + } + + val OrgName = "org.name" + val OrgDomain = "org.domain" + val KdcBindAddress = "kdc.bind.address" + val KdcPort = "kdc.port" + val Instance = "instance" + val MaxTicketLifetime = "max.ticket.lifetime" + val MaxRenewableLifetime = "max.renewable.lifetime" + val Transport = "transport" + val Debug = "debug" + + private val RequiredProperties = Set(OrgName, OrgDomain, KdcBindAddress, KdcPort, Instance, Transport, + MaxTicketLifetime, MaxRenewableLifetime) + + private val DefaultConfig = Map( + KdcBindAddress -> "localhost", + KdcPort -> "0", + Instance -> "DefaultKrbServer", + OrgName -> "Example", + OrgDomain -> "COM", + Transport -> "TCP", + MaxTicketLifetime -> "86400000", + MaxRenewableLifetime -> "604800000", + Debug -> "false" + ) + + /** + * Convenience method that returns MiniKdc default configuration. + * + * The returned configuration is a copy, it can be customized before using + * it to create a MiniKdc. + */ + def createConfig: Properties = { + val properties = new Properties + DefaultConfig.foreach { case (k, v) => properties.setProperty(k, v) } + properties + } + + @throws[IOException] + def getResourceAsStream(resourceName: String): InputStream = { + val cl = Option(Thread.currentThread.getContextClassLoader).getOrElse(classOf[MiniKdc].getClassLoader) + Option(cl.getResourceAsStream(resourceName)).getOrElse { + throw new IOException(s"Can not read resource file `$resourceName`") + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 676772f..2ca64f2 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -77,17 +77,15 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { // The following method does nothing by default, but // if the test case requires setting up a cluster ACL, // then it needs to be implemented. - setClusterAcl match { - case Some(f) => - f() - case None => // Nothing to do - } + setClusterAcl.foreach(_.apply) } @After override def tearDown() { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + if (servers != null) { + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + } super.tearDown } http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 47158d6..6ed317a 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -23,6 +23,8 @@ ext { } versions += [ + apacheda: "1.0.0-M33", + apacheds: "2.0.0-M21", argparse4j: "0.5.0", bcpkix: "1.54", hadoop: "2.7.2", @@ -65,8 +67,16 @@ versions["baseScala"] = versions.scala.substring(0, versions.scala.lastIndexOf(" libs += [ argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j", + apacheda: "org.apache.directory.api:api-all:$versions.apacheda", + apachedsCoreApi: "org.apache.directory.server:apacheds-core-api:$versions.apacheds", + apachedsInterceptorKerberos: "org.apache.directory.server:apacheds-interceptor-kerberos:$versions.apacheds", + apachedsProtocolShared: "org.apache.directory.server:apacheds-protocol-shared:$versions.apacheds", + apachedsProtocolKerberos: "org.apache.directory.server:apacheds-protocol-kerberos:$versions.apacheds", + apachedsProtocolLdap: "org.apache.directory.server:apacheds-protocol-ldap:$versions.apacheds", + apachedsLdifPartition: "org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds", + apachedsMavibotPartition: "org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds", + apachedsJdbmPartition: "org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds", bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", - hadoopMiniKdc: "org.apache.hadoop:hadoop-minikdc:$versions.hadoop", easymock: "org.easymock:easymock:$versions.easymock", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", http://git-wip-us.apache.org/repos/asf/kafka/blob/ef676c15/tests/kafkatest/services/security/minikdc.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py index 3b3a5f1..b376e26 100644 --- a/tests/kafkatest/services/security/minikdc.py +++ b/tests/kafkatest/services/security/minikdc.py @@ -67,10 +67,9 @@ class MiniKdc(Service): principals = 'client ' + kafka_principals + self.extra_principals self.logger.info("Starting MiniKdc with principals " + principals) - lib_dir = "/opt/%s/core/build/dependant-testlibs" % kafka_dir(node) - kdc_jars = node.account.ssh_capture("ls " + lib_dir) - classpath = ":".join([os.path.join(lib_dir, jar.strip()) for jar in kdc_jars]) - cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh org.apache.hadoop.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE) + jar_paths = self.core_jar_paths(node, "dependant-testlibs") + self.core_jar_paths(node, "libs") + classpath = ":".join(jar_paths) + cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh kafka.security.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE) self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd)) with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor: node.account.ssh(cmd) @@ -82,6 +81,11 @@ class MiniKdc(Service): #KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname) + def core_jar_paths(self, node, lib_dir_name): + lib_dir = "/opt/%s/core/build/%s" % (kafka_dir(node), lib_dir_name) + jars = node.account.ssh_capture("ls " + lib_dir) + return [os.path.join(lib_dir, jar.strip()) for jar in jars] + def stop_node(self, node): self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname)) node.account.kill_process("apacheds", allow_fail=False)
