KAFKA-2844; Separate keytabs for sasl tests Use a different keytab for server and client in SASL tests
Also: * Improve approach used to build the JAAS files programmatically * Delete stale `kafka_jaas.conf` file * Move `FourLetterWords` to its own file, add `Zk` prefix and clean-up its usage Author: Ismael Juma <[email protected]> Reviewers: Harsha Chintalapani, Gwen Shapira Closes #533 from ijuma/separate-keytabs-for-sasl-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c588a72a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c588a72a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c588a72a Branch: refs/heads/0.10.0 Commit: c588a72ad21f313d0c0ced11f083eca18fab84a1 Parents: b5de412 Author: Ismael Juma <[email protected]> Authored: Fri Apr 1 15:25:35 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- core/src/test/resources/kafka_jaas.conf | 29 ---- .../scala/integration/kafka/api/SaslSetup.scala | 39 ++--- .../security/auth/ZkAuthorizationTest.scala | 47 ++---- .../scala/unit/kafka/utils/JaasTestUtils.scala | 156 ++++++++++++------- .../scala/unit/kafka/zk/ZKEphemeralTest.scala | 66 ++++---- .../scala/unit/kafka/zk/ZkFourLetterWords.scala | 47 ++++++ .../unit/kafka/zk/ZooKeeperTestHarness.scala | 48 ++---- 7 files changed, 217 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/resources/kafka_jaas.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf deleted file mode 100644 index b097e26..0000000 --- a/core/src/test/resources/kafka_jaas.conf +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -KafkaClient { - com.sun.security.auth.module.Krb5LoginModule required debug=true - useKeyTab=true - storeKey=true - serviceName="kafka" - keyTab="$keytab-location" - principal="[email protected]"; -}; - -KafkaServer { - com.sun.security.auth.module.Krb5LoginModule required debug=true - useKeyTab=true - storeKey=true - serviceName="kafka" - keyTab="$keytab-location" - principal="kafka/[email protected]"; -}; http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 8255e6a..967cae1 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -35,8 +35,7 @@ case object KafkaSasl extends SaslSetupMode case object Both extends SaslSetupMode /* - * Trait used in SaslTestHarness and EndToEndAuthorizationTest - * currently to setup a keytab and jaas files. + * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files. */ trait SaslSetup { private val workDir = TestUtils.tempDir() @@ -46,34 +45,26 @@ trait SaslSetup { def startSasl(mode: SaslSetupMode = Both) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() - val keytabFile = createKeytabAndSetConfiguration(mode) + val (serverKeytabFile, clientKeytabFile) = createKeytabsAndSetConfiguration(mode) kdc.start() - kdc.createPrincipal(keytabFile, "client", "kafka/localhost") + kdc.createPrincipal(serverKeytabFile, "kafka/localhost") + kdc.createPrincipal(clientKeytabFile, "client") if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } - protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = { - val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode) + protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, File) = { + val serverKeytabFile = TestUtils.tempFile() + val clientKeytabFile = TestUtils.tempFile() + val jaasFile = mode match { + case ZkSasl => JaasTestUtils.writeZkFile() + case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, clientKeytabFile) + case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, clientKeytabFile) + } // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath) - keytabFile - } - - private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = { - val keytabFile = TestUtils.tempFile() - val jaasFileName: String = mode match { - case ZkSasl => - JaasTestUtils.genZkFile - case KafkaSasl => - JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath) - case _ => - JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath) - } - val jaasFile = new File(jaasFileName) - - (keytabFile, jaasFile) + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile) + (serverKeytabFile, clientKeytabFile) } def closeSasl() { @@ -81,7 +72,7 @@ trait SaslSetup { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) - System.clearProperty("zookeeper.authProvider.1"); + System.clearProperty("zookeeper.authProvider.1") Configuration.setConfiguration(null) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 6a533b3..ab5324c 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -22,17 +22,17 @@ import kafka.utils.{Logging, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.KafkaException import org.apache.kafka.common.security.JaasUtils -import org.apache.zookeeper.data.{ACL, Stat} +import org.apache.zookeeper.data.{ACL} import org.junit.Assert._ -import org.junit.{After, Before, BeforeClass, Test} +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ import scala.util.{Try, Success, Failure} import javax.security.auth.login.Configuration +class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { + val jaasFile = kafka.utils.JaasTestUtils.writeZkFile + val authProvider = "zookeeper.authProvider.1" -class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ - val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile - val authProvider: String = "zookeeper.authProvider.1" @Before override def setUp() { Configuration.setConfiguration(null) @@ -65,12 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ JaasUtils.isZkSecurityEnabled() fail("Should have thrown an exception") } catch { - case e: KafkaException => { - // Expected - } - case e: Exception => { - fail(e.toString) - } + case e: KafkaException => // Expected } } @@ -241,10 +236,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ case false => list.size == 1 } isListSizeCorrect && list.asScala.forall( - secure match { - case true => isAclSecure - case false => isAclUnsecure - }) + secure match { + case true => isAclSecure + case false => isAclUnsecure + }) } /** @@ -255,15 +250,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ private def isAclSecure(acl: ACL): Boolean = { info(s"ACL $acl") acl.getPerms match { - case 1 => { - acl.getId.getScheme.equals("world") - } - case 31 => { - acl.getId.getScheme.equals("sasl") - } - case _: Int => { - false - } + case 1 => acl.getId.getScheme.equals("world") + case 31 => acl.getId.getScheme.equals("sasl") + case _ => false } } @@ -273,12 +262,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ private def isAclUnsecure(acl: ACL): Boolean = { info(s"ACL $acl") acl.getPerms match { - case 31 => { - acl.getId.getScheme.equals("world") - } - case _: Int => { - false - } + case 31 => acl.getId.getScheme.equals("world") + case _ => false } } @@ -323,7 +308,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ case "/" => result // For all other paths, try to delete it case path => - try{ + try { zkUtils.deletePath(path) Failure(new Exception(s"Have been able to delete $path")) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index cf08830..a14cd3f 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -16,72 +16,110 @@ */ package kafka.utils +import java.io.{File, BufferedWriter, FileWriter} object JaasTestUtils { - // ZooKeeper vals - val zkServerContextName = "Server" - val zkClientContextName = "Client" - val userSuperPasswd = "adminpasswd" - val user = "fpj" - val userPasswd = "fpjsecret" - val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" - //Kafka vals - val kafkaServerContextName = "KafkaServer" - val kafkaClientContextName = "KafkaClient" - val kafkaServerPrincipal = "[email protected]" - val kafkaClientPrincipal = "kafka/[email protected]" - val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule" - - def genZkFile: String = { - val jaasFile = java.io.File.createTempFile("jaas", ".conf") - val jaasOutputStream = new java.io.FileOutputStream(jaasFile) - writeZkToOutputStream(jaasOutputStream) - jaasOutputStream.close() - jaasFile.deleteOnExit() + + case class Krb5LoginModule(contextName: String, + useKeyTab: Boolean, + storeKey: Boolean, + keyTab: String, + principal: String, + debug: Boolean, + serviceName: Option[String]) { + def toJaasSection: JaasSection = { + JaasSection( + contextName, + "com.sun.security.auth.module.Krb5LoginModule", + debug = debug, + entries = Map( + "useKeyTab" -> useKeyTab.toString, + "storeKey" -> storeKey.toString, + "keyTab" -> keyTab, + "principal" -> principal + ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty) + ) + } + } + + case class JaasSection(contextName: String, + moduleName: String, + debug: Boolean, + entries: Map[String, String]) { + override def toString: String = { + s"""|$contextName { + | $moduleName required + | debug=$debug + | ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")} + |}; + |""".stripMargin + } + } + + private val ZkServerContextName = "Server" + private val ZkClientContextName = "Client" + private val ZkUserSuperPasswd = "adminpasswd" + private val ZkUser = "fpj" + private val ZkUserPassword = "fpjsecret" + private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" + + private val KafkaServerContextName = "KafkaServer" + private val KafkaServerPrincipal = "kafka/[email protected]" + private val KafkaClientContextName = "KafkaClient" + private val KafkaClientPrincipal = "[email protected]" + + def writeZkFile(): String = { + val jaasFile = TestUtils.tempFile() + writeToFile(jaasFile, zkSections) jaasFile.getCanonicalPath } - - def genKafkaFile(keytabLocation: String): String = { - val jaasFile = java.io.File.createTempFile("jaas", ".conf") - val jaasOutputStream = new java.io.FileOutputStream(jaasFile) - writeKafkaToOutputStream(jaasOutputStream, keytabLocation) - jaasOutputStream.close() - jaasFile.deleteOnExit() + + def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): String = { + val jaasFile = TestUtils.tempFile() + writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation)) jaasFile.getCanonicalPath } - - def genZkAndKafkaFile(keytabLocation: String): String = { - val jaasFile = java.io.File.createTempFile("jaas", ".conf") - val jaasOutputStream = new java.io.FileOutputStream(jaasFile) - writeKafkaToOutputStream(jaasOutputStream, keytabLocation) - jaasOutputStream.write("\n\n".getBytes) - writeZkToOutputStream(jaasOutputStream) - jaasOutputStream.close() - jaasFile.deleteOnExit() + + def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: File): String = { + val jaasFile = TestUtils.tempFile() + writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation) ++ zkSections) jaasFile.getCanonicalPath } - - private def writeZkToOutputStream(jaasOutputStream: java.io.FileOutputStream) { - jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule required\n".getBytes) - jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes) - jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes) - jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule required\n""".getBytes) - jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes) - jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes) + + private def zkSections: Seq[JaasSection] = Seq( + JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)), + JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)) + ) + + private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: File): Seq[JaasSection] = { + Seq( + Krb5LoginModule( + KafkaServerContextName, + useKeyTab = true, + storeKey = true, + keyTab = serverKeytabLocation.getAbsolutePath, + principal = KafkaServerPrincipal, + debug = true, + serviceName = Some("kafka")), + Krb5LoginModule( + KafkaClientContextName, + useKeyTab = true, + storeKey = true, + keyTab = clientKeytabLocation.getAbsolutePath, + principal = KafkaClientPrincipal, + debug = true, + serviceName = Some("kafka") + ) + ).map(_.toJaasSection) } - - private def writeKafkaToOutputStream(jaasOutputStream: java.io.FileOutputStream, keytabLocation: String) { - jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule required debug=true\n".getBytes) - jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes) - jaasOutputStream.write(s"\tstoreKey=true\n".getBytes) - jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes) - jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes) - jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes) - jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule required debug=true\n""".getBytes) - jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes) - jaasOutputStream.write(s"\tstoreKey=true\n".getBytes) - jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes) - jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes) - jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes) + + private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String = + jaasSections.mkString + + private def writeToFile(file: File, jaasSections: Seq[JaasSection]) { + val writer = new BufferedWriter(new FileWriter(file)) + try writer.write(jaasSectionsToString(jaasSections)) + finally writer.close() } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 32c7a5d..c2c25ed 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -17,10 +17,11 @@ package kafka.zk -import java.util.ArrayList -import java.util.Collection +import java.lang.Iterable import javax.security.auth.login.Configuration +import scala.collection.JavaConverters._ + import kafka.consumer.ConsumerConfig import kafka.utils.ZkUtils import kafka.utils.ZKCheckedEphemeral @@ -30,26 +31,24 @@ import org.apache.zookeeper.CreateMode import org.apache.zookeeper.WatchedEvent import org.apache.zookeeper.Watcher import org.apache.zookeeper.ZooDefs.Ids -import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException} +import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.junit.{After, Before, Test, Assert} -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.junit.runner.RunWith; +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import org.junit.runner.RunWith object ZKEphemeralTest { + @Parameters - def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = { - val list = new ArrayList[Array[java.lang.Boolean]]() - list.add(Array(true)) - list.add(Array(false)) - list - } + def enableSecurityOptions: Iterable[Array[java.lang.Boolean]] = + Seq[Array[java.lang.Boolean]](Array(true), Array(false)).asJava + } @RunWith(value = classOf[Parameterized]) class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { - val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile - val authProvider: String = "zookeeper.authProvider.1" + val jaasFile = kafka.utils.JaasTestUtils.writeZkFile() + val authProvider = "zookeeper.authProvider.1" var zkSessionTimeoutMs = 1000 @Before @@ -103,17 +102,14 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { */ @Test def testZkWatchedEphemeral = { - var path = "/zwe-test" - testCreation(path) - path = "/zwe-test-parent/zwe-test" - testCreation(path) + testCreation("/zwe-test") + testCreation("/zwe-test-parent/zwe-test") } private def testCreation(path: String) { val zk = zkUtils.zkConnection.getZookeeper val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled()) var created = false - var counter = 10 zk.exists(path, new Watcher() { def process(event: WatchedEvent) { @@ -140,19 +136,19 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { //Creates a second session val (zkClient2, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout) val zk2 = zkConnection2.getZookeeper - var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled()) + val zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled()) // Creates znode for path in the first session zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) //Bootstraps the ZKWatchedEphemeral object - var gotException = false; - try { - zwe.create() - } catch { - case e: ZkNodeExistsException => - gotException = true - } + val gotException = + try { + zwe.create() + false + } catch { + case e: ZkNodeExistsException => true + } Assert.assertTrue(gotException) zkClient2.close() } @@ -168,15 +164,15 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { // Creates znode for path in the first session zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled()) + val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled()) //Bootstraps the ZKWatchedEphemeral object - var gotException = false; - try { - zwe.create() - } catch { - case e: ZkNodeExistsException => - gotException = true - } + val gotException = + try { + zwe.create() + false + } catch { + case e: ZkNodeExistsException => true + } Assert.assertFalse(gotException) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala new file mode 100644 index 0000000..6eaee70 --- /dev/null +++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import java.io.IOException +import java.net.{SocketTimeoutException, Socket, InetAddress, InetSocketAddress} + +/** + * ZooKeeper responds to a small set of commands. Each command is composed of four letters. You issue the commands to + * ZooKeeper via telnet or nc, at the client port. + * + * Three of the more interesting commands: "stat" gives some general information about the server and connected + * clients, while "srvr" and "cons" give extended details on server and connections respectively. + */ +object ZkFourLetterWords { + def sendStat(host: String, port: Int, timeout: Int) { + val hostAddress = + if (host != null) new InetSocketAddress(host, port) + else new InetSocketAddress(InetAddress.getByName(null), port) + val sock = new Socket() + try { + sock.connect(hostAddress, timeout) + val outStream = sock.getOutputStream + outStream.write("stat".getBytes) + outStream.flush() + } catch { + case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw") + } finally { + sock.close + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index d618ba6..95f4e35 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -17,40 +17,12 @@ package kafka.zk -import java.io._ -import java.net._ import javax.security.auth.login.Configuration -import org.I0Itec.zkclient.{ZkClient, ZkConnection} import kafka.utils.{ZkUtils, Logging, CoreUtils} import org.junit.{After, Before} import org.scalatest.junit.JUnitSuite import org.apache.kafka.common.security.JaasUtils -object FourLetterWords { - def sendStat(host: String, port: Int, timeout: Int) { - val hostAddress = if (host != null) - new InetSocketAddress(host, port) - else - new InetSocketAddress(InetAddress.getByName(null), port) - val sock = new Socket() - var reader: BufferedReader = null - sock.connect(hostAddress, timeout) - try { - val outstream = sock.getOutputStream - outstream.write("stat".getBytes) - outstream.flush - } catch { - case e: SocketTimeoutException => { - throw new IOException("Exception while sending 4lw") - } - } finally { - sock.close - if (reader != null) - reader.close - } - } -} - trait ZooKeeperTestHarness extends JUnitSuite with Logging { var zookeeper: EmbeddedZookeeper = null var zkPort: Int = -1 @@ -73,18 +45,20 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging { CoreUtils.swallow(zkUtils.close()) if (zookeeper != null) CoreUtils.swallow(zookeeper.shutdown()) - - var isDown = false - while(!isDown) { + + def isDown(): Boolean = { try { - FourLetterWords.sendStat("127.0.0.1", zkPort, 3000) - } catch { - case _: Throwable => { - info("Server is down") - isDown = true - } + ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000) + false + } catch { case _: Throwable => + debug("Server is down") + true } } + + Iterator.continually(isDown()).exists(identity) + Configuration.setConfiguration(null) } + }
