Repository: kafka Updated Branches: refs/heads/trunk ab6f848ba -> 070ec0fc5
MINOR: Revert EmbeddedZooKeeper rename Even though this class is internal, it's widely used by other projects and it's better to avoid breaking them until we have a publicly supported test library. Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #4138 from ijuma/revert-embedded-zookeeper-rename Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/070ec0fc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/070ec0fc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/070ec0fc Branch: refs/heads/trunk Commit: 070ec0fc586958f260f71558171abfd0d447290d Parents: ab6f848 Author: Ismael Juma <[email protected]> Authored: Thu Oct 26 14:23:00 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Oct 26 14:23:00 2017 +0100 ---------------------------------------------------------------------- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 21 +++++++++++++++----- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 ++-- .../integration/utils/EmbeddedKafkaCluster.java | 6 +++--- 3 files changed, 21 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/070ec0fc/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 36d477c..adc8d05 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -25,19 +25,30 @@ import java.net.InetSocketAddress import kafka.utils.CoreUtils import org.apache.kafka.common.utils.Utils -class EmbeddedZooKeeper() { +/** + * ZooKeeperServer wrapper that starts the server with temporary directories during construction and deletes + * the directories when `shutdown()` is called. + * + * This is an internal class and it's subject to change. We recommend that you implement your own simple wrapper + * if you need similar functionality. + */ +// This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other +// projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for +// others to use. +class EmbeddedZookeeper() { + val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() val tickTime = 500 - val zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime) + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) val factory = new NIOServerCnxnFactory() private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort) factory.configure(addr, 0) - factory.startup(zooKeeperServer) - val port = zooKeeperServer.getClientPort() + factory.startup(zookeeper) + val port = zookeeper.getClientPort def shutdown() { - CoreUtils.swallow(zooKeeperServer.shutdown()) + CoreUtils.swallow(zookeeper.shutdown()) CoreUtils.swallow(factory.shutdown()) def isDown(): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/070ec0fc/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 6bedba3..0a7e631 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -41,14 +41,14 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { protected val zkAclsEnabled: Option[Boolean] = None var zkUtils: ZkUtils = null - var zookeeper: EmbeddedZooKeeper = null + var zookeeper: EmbeddedZookeeper = null def zkPort: Int = zookeeper.port def zkConnect: String = s"127.0.0.1:$zkPort" @Before def setUp() { - zookeeper = new EmbeddedZooKeeper() + zookeeper = new EmbeddedZookeeper() zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled())) } http://git-wip-us.apache.org/repos/asf/kafka/blob/070ec0fc/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 0145827..367e489 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -20,7 +20,7 @@ import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZooKeeper; +import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.security.JaasUtils; @@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private static final int TOPIC_CREATION_TIMEOUT = 30000; private static final int TOPIC_DELETION_TIMEOUT = 30000; - private EmbeddedZooKeeper zookeeper = null; + private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private ZkUtils zkUtils = null; @@ -84,7 +84,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { public void start() throws IOException, InterruptedException { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); - zookeeper = new EmbeddedZooKeeper(); + zookeeper = new EmbeddedZookeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); zkUtils = ZkUtils.apply(
