KAFKA-1501 Let the OS choose the port in unit tests to avoid collisions. Patch by Ewen CP, reviewed by Guozhang and me.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6adaffd8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6adaffd8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6adaffd8 Branch: refs/heads/trunk Commit: 6adaffd8ea4bc1f40bd5cf5268c30eb2df1868ab Parents: 15b93a4 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Sat Apr 4 14:26:38 2015 -0700 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Sat Apr 4 14:26:38 2015 -0700 ---------------------------------------------------------------------- .../kafka/common/network/SelectorTest.java | 9 +- .../java/org/apache/kafka/test/TestUtils.java | 28 ---- .../main/scala/kafka/network/SocketServer.scala | 12 +- .../main/scala/kafka/server/KafkaServer.scala | 5 +- .../kafka/api/ConsumerBounceTest.scala | 154 +++++++++++++++++ .../integration/kafka/api/ConsumerTest.scala | 83 ---------- .../kafka/api/FixedPortTestUtils.scala | 55 +++++++ .../kafka/api/IntegrationTestHarness.scala | 13 +- .../kafka/api/ProducerBounceTest.scala | 164 +++++++++++++++++++ .../kafka/api/ProducerCompressionTest.scala | 14 +- .../kafka/api/ProducerFailureHandlingTest.scala | 89 ++-------- .../kafka/api/ProducerSendTest.scala | 12 +- .../unit/kafka/admin/AddPartitionsTest.scala | 27 +-- .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +- .../kafka/admin/DeleteConsumerGroupTest.scala | 4 +- .../unit/kafka/admin/DeleteTopicTest.scala | 6 +- .../kafka/consumer/ConsumerIteratorTest.scala | 29 ++-- .../ZookeeperConsumerConnectorTest.scala | 47 +++--- .../kafka/integration/AutoOffsetResetTest.scala | 4 +- .../unit/kafka/integration/FetcherTest.scala | 28 ++-- .../integration/KafkaServerTestHarness.scala | 24 ++- .../kafka/integration/PrimitiveApiTest.scala | 12 +- .../ProducerConsumerTestHarness.scala | 5 +- .../kafka/integration/RollingBounceTest.scala | 29 +--- .../kafka/integration/TopicMetadataTest.scala | 7 +- .../integration/UncleanLeaderElectionTest.scala | 25 +-- .../ZookeeperConsumerConnectorTest.scala | 11 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 2 +- .../kafka/log4j/KafkaLog4jAppenderTest.scala | 11 +- .../scala/unit/kafka/metrics/MetricsTest.scala | 9 +- .../unit/kafka/network/SocketServerTest.scala | 6 +- .../unit/kafka/producer/AsyncProducerTest.scala | 24 +-- .../unit/kafka/producer/ProducerTest.scala | 55 ++++--- .../unit/kafka/producer/SyncProducerTest.scala | 19 +-- .../unit/kafka/server/AdvertiseBrokerTest.scala | 2 +- .../kafka/server/DynamicConfigChangeTest.scala | 3 +- .../server/HighwatermarkPersistenceTest.scala | 2 +- .../unit/kafka/server/ISRExpirationTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 36 ++-- .../unit/kafka/server/LeaderElectionTest.scala | 13 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 9 +- .../unit/kafka/server/LogRecoveryTest.scala | 36 ++-- .../unit/kafka/server/OffsetCommitTest.scala | 5 +- .../unit/kafka/server/ReplicaFetchTest.scala | 11 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 +- .../server/ServerGenerateBrokerIdTest.scala | 19 ++- .../unit/kafka/server/ServerShutdownTest.scala | 21 ++- .../unit/kafka/server/ServerStartupTest.scala | 6 +- .../unit/kafka/server/SimpleFetchTest.scala | 2 +- .../unit/kafka/utils/ReplicationUtilsTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 51 +++--- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 6 +- .../test/scala/unit/kafka/zk/ZKPathTest.scala | 2 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 11 +- 54 files changed, 732 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 0d030bc..d5b306b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -131,9 +131,12 @@ public class SelectorTest { @Test public void testConnectionRefused() throws Exception { int node = 0; - selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE); + ServerSocket nonListeningSocket = new ServerSocket(0); + int nonListeningPort = nonListeningSocket.getLocalPort(); + selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE); while (selector.disconnected().contains(node)) selector.poll(1000L); + nonListeningSocket.close(); } /** @@ -271,8 +274,8 @@ public class SelectorTest { private final List<Socket> sockets; public EchoServer() throws Exception { - this.port = TestUtils.choosePort(); - this.serverSocket = new ServerSocket(port); + this.serverSocket = new ServerSocket(0); + this.port = this.serverSocket.getLocalPort(); this.threads = Collections.synchronizedList(new ArrayList<Thread>()); this.sockets = Collections.synchronizedList(new ArrayList<Socket>()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 20dba7b..ccf3a5f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -19,8 +19,6 @@ package org.apache.kafka.test; import static java.util.Arrays.asList; import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -60,32 +58,6 @@ public class TestUtils { } /** - * Choose a number of random available ports - */ - public static int[] choosePorts(int count) { - try { - ServerSocket[] sockets = new ServerSocket[count]; - int[] ports = new int[count]; - for (int i = 0; i < count; i++) { - sockets[i] = new ServerSocket(0); - ports[i] = sockets[i].getLocalPort(); - } - for (int i = 0; i < count; i++) - sockets[i].close(); - return ports; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Choose an available port - */ - public static int choosePort() { - return choosePorts(1)[0]; - } - - /** * Generate an array of random bytes * * @param size The size of the array http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 76ce41a..07ce58e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -39,7 +39,7 @@ import com.yammer.metrics.core.{Gauge, Meter} */ class SocketServer(val brokerId: Int, val host: String, - val port: Int, + private val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, @@ -72,7 +72,7 @@ class SocketServer(val brokerId: Int, requestChannel, quotas, connectionsMaxIdleMs) - Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() + Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { @@ -100,6 +100,12 @@ class SocketServer(val brokerId: Int, processor.shutdown() info("Shutdown completed") } + + def boundPort(): Int = { + if (acceptor == null) + throw new KafkaException("Tried to check server's port before server was started") + acceptor.serverChannel.socket().getLocalPort + } } /** @@ -197,7 +203,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ * Thread that accepts and configures new connections. There is only need for one of these */ private[kafka] class Acceptor(val host: String, - val port: Int, + private val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val recvBufferSize: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4db3384..10ea77a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -157,7 +157,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + val advertisedPort = if (config.advertisedPort != 0) config.advertisedPort else socketServer.boundPort() + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() /* register broker metrics */ @@ -357,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager + def boundPort(): Int = socketServer.boundPort() + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala new file mode 100644 index 0000000..35f4f46 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -0,0 +1,154 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package kafka.api + +import kafka.server.KafkaConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.CommitType +import org.apache.kafka.common.TopicPartition + +import kafka.utils.{ShutdownableThread, TestUtils, Logging} + +import org.junit.Assert._ + +import scala.collection.JavaConversions._ + +/** + * Integration tests for the new consumer that cover basic usage as well as server failures + */ +class ConsumerBounceTest extends IntegrationTestHarness with Logging { + + val producerCount = 1 + val consumerCount = 2 + val serverCount = 3 + + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + + // configure the servers and clients + this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown + this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset + this.serverConfig.setProperty("offsets.topic.num.partitions", "1") + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") + this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, serverConfig)) + } + + override def setUp() { + super.setUp() + + // create the test topic with all the brokers as replicas + TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) + } + + def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5) + + /* + * 1. Produce a bunch of messages + * 2. Then consume the messages while killing and restarting brokers at random + */ + def consumeWithBrokerFailures(numIters: Int) { + val numRecords = 1000 + sendRecords(numRecords) + this.producers.map(_.close) + + var consumed = 0 + val consumer = this.consumers(0) + consumer.subscribe(topic) + + val scheduler = new BounceBrokerScheduler(numIters) + scheduler.start() + + while (scheduler.isRunning.get()) { + for (record <- consumer.poll(100)) { + assertEquals(consumed.toLong, record.offset()) + consumed += 1 + } + consumer.commit(CommitType.SYNC) + + if (consumed == numRecords) { + consumer.seekToBeginning() + consumed = 0 + } + } + scheduler.shutdown() + } + + def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5) + + def seekAndCommitWithBrokerFailures(numIters: Int) { + val numRecords = 1000 + sendRecords(numRecords) + this.producers.map(_.close) + + val consumer = this.consumers(0) + consumer.subscribe(tp) + consumer.seek(tp, 0) + + val scheduler = new BounceBrokerScheduler(numIters) + scheduler.start() + + while(scheduler.isRunning.get()) { + val coin = TestUtils.random.nextInt(3) + if (coin == 0) { + info("Seeking to end of log") + consumer.seekToEnd() + assertEquals(numRecords.toLong, consumer.position(tp)) + } else if (coin == 1) { + val pos = TestUtils.random.nextInt(numRecords).toLong + info("Seeking to " + pos) + consumer.seek(tp, pos) + assertEquals(pos, consumer.position(tp)) + } else if (coin == 2) { + info("Committing offset.") + consumer.commit(CommitType.SYNC) + assertEquals(consumer.position(tp), consumer.committed(tp)) + } + } + } + + private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false) + { + var iter: Int = 0 + + override def doWork(): Unit = { + killRandomBroker() + Thread.sleep(500) + restartDeadBrokers() + + iter += 1 + if (iter == numIters) + initiateShutdown() + else + Thread.sleep(500) + } + } + + private def sendRecords(numRecords: Int) { + val futures = (0 until numRecords).map { i => + this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + } + futures.map(_.get) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index c82bdaa..ffbdf5d 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -146,72 +146,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } - def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5) - - /* - * 1. Produce a bunch of messages - * 2. Then consume the messages while killing and restarting brokers at random - */ - def consumeWithBrokerFailures(numIters: Int) { - val numRecords = 1000 - sendRecords(numRecords) - this.producers.map(_.close) - - var consumed = 0 - val consumer = this.consumers(0) - consumer.subscribe(topic) - - val scheduler = new BounceBrokerScheduler(numIters) - scheduler.start() - - while (scheduler.isRunning.get()) { - for (record <- consumer.poll(100)) { - assertEquals(consumed.toLong, record.offset()) - consumed += 1 - } - consumer.commit(CommitType.SYNC) - - if (consumed == numRecords) { - consumer.seekToBeginning() - consumed = 0 - } - } - scheduler.shutdown() - } - - def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5) - - def seekAndCommitWithBrokerFailures(numIters: Int) { - val numRecords = 1000 - sendRecords(numRecords) - this.producers.map(_.close) - - val consumer = this.consumers(0) - consumer.subscribe(tp) - consumer.seek(tp, 0) - - val scheduler = new BounceBrokerScheduler(numIters) - scheduler.start() - - while(scheduler.isRunning.get()) { - val coin = TestUtils.random.nextInt(3) - if (coin == 0) { - info("Seeking to end of log") - consumer.seekToEnd() - assertEquals(numRecords.toLong, consumer.position(tp)) - } else if (coin == 1) { - val pos = TestUtils.random.nextInt(numRecords).toLong - info("Seeking to " + pos) - consumer.seek(tp, pos) - assertEquals(pos, consumer.position(tp)) - } else if (coin == 2) { - info("Committing offset.") - consumer.commit(CommitType.SYNC) - assertEquals(consumer.position(tp), consumer.committed(tp)) - } - } - } - def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test @@ -255,23 +189,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging { } } - private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false) - { - var iter: Int = 0 - - override def doWork(): Unit = { - killRandomBroker() - Thread.sleep(500) - restartDeadBrokers() - - iter += 1 - if (iter == numIters) - initiateShutdown() - else - Thread.sleep(500) - } - } - private def sendRecords(numRecords: Int) { val futures = (0 until numRecords).map { i => this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala new file mode 100644 index 0000000..1d31a43 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala @@ -0,0 +1,55 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.io.IOException +import java.net.ServerSocket +import java.util.Properties + +import kafka.utils.TestUtils + +/** + * DO NOT USE THESE UTILITIES UNLESS YOU ABSOLUTELY MUST + * + * These are utilities for selecting fixed (preselected), ephemeral ports to use with tests. This is not a reliable way + * of testing on most machines because you can easily run into port conflicts. If you're using this class, you're almost + * certainly doing something wrong unless you can prove that your test **cannot function** properly without it. + */ +object FixedPortTestUtils { + def choosePorts(count: Int): Seq[Int] = { + try { + val sockets = (0 until count).map(i => new ServerSocket(0)) + val ports = sockets.map(_.getLocalPort()) + sockets.foreach(_.close()) + ports + } catch { + case e: IOException => { + throw new RuntimeException(e) + } + } + } + + def createBrokerConfigs(numConfigs: Int, + zkConnect: String, + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Seq[Properties] = { + val ports = FixedPortTestUtils.choosePorts(numConfigs) + (0 until numConfigs) + .map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node))) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 5b7e366..02d2627 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -38,15 +38,16 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { lazy val producerConfig = new Properties lazy val consumerConfig = new Properties lazy val serverConfig = new Properties - override lazy val configs = { - val cfgs = TestUtils.createBrokerConfigs(serverCount) + + var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + + override def generateConfigs() = { + val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect) cfgs.map(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } - - var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() - var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - + override def setUp() { super.setUp() producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala new file mode 100644 index 0000000..c9d16bb --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -0,0 +1,164 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import kafka.consumer.SimpleConsumer +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{ShutdownableThread, TestUtils} +import org.apache.kafka.clients.producer._ +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.junit.Assert._ +import org.junit.Test + +class ProducerBounceTest extends KafkaServerTestHarness { + private val producerBufferSize = 30000 + private val serverMessageMaxBytes = producerBufferSize/2 + + val numServers = 2 + + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) + overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) + + // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient + // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing + // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation + // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have + // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a + // running server. + // + // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving + // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, overridingProps)) + } + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null + + private val topic1 = "topic-1" + private val topic2 = "topic-2" + + override def setUp() { + super.setUp() + + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) + } + + override def tearDown() { + if (producer1 != null) producer1.close + if (producer2 != null) producer2.close + if (producer3 != null) producer3.close + if (producer4 != null) producer4.close + + super.tearDown() + } + + /** + * With replication, producer should able able to find new leader after it detects broker failure + */ + @Test + def testBrokerFailure() { + val numPartitions = 3 + val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers) + assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + + val scheduler = new ProducerScheduler() + scheduler.start + + // rolling bounce brokers + for (i <- 0 until numServers) { + for (server <- servers) { + server.shutdown() + server.awaitShutdown() + server.startup() + Thread.sleep(2000) + } + + // Make sure the producer do not see any exception + // in returned metadata due to broker failures + assertTrue(scheduler.failed == false) + + // Make sure the leader still exists after bouncing brokers + (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition)) + } + + scheduler.shutdown + + // Make sure the producer do not see any exception + // when draining the left messages on shutdown + assertTrue(scheduler.failed == false) + + // double check that the leader info has been propagated after consecutive bounces + val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i)) + val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) => + // Consumers must be instantiated after all the restarts since they use random ports each time they start up + val consumer = new SimpleConsumer("localhost", servers(leader).boundPort(), 100, 1024 * 1024, "") + val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) + consumer.close + response + } + val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message)) + val uniqueMessages = messages.toSet + val uniqueMessageSize = uniqueMessages.size + + assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) + } + + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) + { + val numRecords = 1000 + var sent = 0 + var failed = false + + val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10) + + override def doWork(): Unit = { + val responses = + for (i <- sent+1 to sent+numRecords) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes), + new ErrorLoggingCallback(topic1, null, null, true)) + val futures = responses.toList + + try { + futures.map(_.get) + sent += numRecords + } catch { + case e : Exception => failed = true + } + } + + override def shutdown(){ + super.shutdown() + producer.close + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index cae72f4..3a7ae8b 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -34,24 +34,22 @@ import kafka.message.Message import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Utils, TestUtils} -import scala.Array - @RunWith(value = classOf[Parameterized]) class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { private val brokerId = 0 - private val port = TestUtils.choosePort private var server: KafkaServer = null - private val props = TestUtils.createBrokerConfig(brokerId, port) - private val config = KafkaConfig.fromProps(props) - private val topic = "topic" private val numRecords = 2000 @Before override def setUp() { super.setUp() + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + val config = KafkaConfig.fromProps(props) + server = TestUtils.createServer(config) } @@ -71,14 +69,14 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK def testCompression() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") props.put(ProducerConfig.LINGER_MS_CONFIG, "200") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) - val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") + val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 1024*1024, "") try { // create topic http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 7eb6d05..ee94011 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -20,7 +20,6 @@ package kafka.api import org.junit.Test import org.junit.Assert._ -import java.lang.Integer import java.util.{Properties, Random} import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} @@ -28,7 +27,7 @@ import kafka.common.Topic import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} +import kafka.utils.{ShutdownableThread, TestUtils} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException} @@ -42,16 +41,14 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { val numServers = 2 val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect) overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) - val configs = - for (props <- TestUtils.createBrokerConfigs(numServers, false)) - yield KafkaConfig.fromProps(props, overridingProps) + def generateConfigs() = + TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null @@ -67,19 +64,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override def setUp() { super.setUp() - // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize) producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) } override def tearDown() { - consumer1.close - consumer2.close - if (producer1 != null) producer1.close if (producer2 != null) producer2.close if (producer3 != null) producer3.close @@ -94,7 +84,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testTooLargeRecordWithAckZero() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) // send a too-large record val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) @@ -107,7 +97,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testTooLargeRecordWithAckOne() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) // send a too-large record val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) @@ -141,7 +131,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testWrongBrokerList() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) // producer with incorrect broker list producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) @@ -161,7 +151,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testNoResponse() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) // first send a message to make sure the metadata is refreshed val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) @@ -202,7 +192,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testInvalidPartition() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) // create a record with incorrect partition id, send should fail val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes) @@ -223,7 +213,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testSendAfterClosed() { // create topic - TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) @@ -248,59 +238,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // re-close producer is fine } - /** - * With replication, producer should able able to find new leader after it detects broker failure - */ - @Test - def testBrokerFailure() { - // create topic - val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) - val partition = 0 - assertTrue("Leader of partition 0 of the topic should exist", leaders(partition).isDefined) - - val scheduler = new ProducerScheduler() - scheduler.start - - // rolling bounce brokers - for (i <- 0 until 2) { - for (server <- servers) { - server.shutdown() - server.awaitShutdown() - server.startup() - - Thread.sleep(2000) - } - - // Make sure the producer do not see any exception - // in returned metadata due to broker failures - assertTrue(scheduler.failed == false) - - // Make sure the leader still exists after bouncing brokers - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition) - } - - scheduler.shutdown - - // Make sure the producer do not see any exception - // when draining the left messages on shutdown - assertTrue(scheduler.failed == false) - - // double check that the leader info has been propagated after consecutive bounces - val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) - - val fetchResponse = if(leader == configs(0).brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) - } - - val messages = fetchResponse.iterator.toList.map(_.message) - val uniqueMessages = messages.toSet - val uniqueMessageSize = uniqueMessages.size - - assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) - } - @Test def testCannotSendToInternalTopic() { val thrown = intercept[ExecutionException] { @@ -313,9 +250,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { def testNotEnoughReplicas() { val topicName = "minisrtest" val topicProps = new Properties() - topicProps.put("min.insync.replicas","3") + topicProps.put("min.insync.replicas",(numServers+1).toString) - TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps) val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) try { @@ -333,9 +270,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { def testNotEnoughReplicasAfterBrokerShutdown() { val topicName = "minisrtest2" val topicProps = new Properties() - topicProps.put("min.insync.replicas","2") + topicProps.put("min.insync.replicas",numServers.toString) - TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps) val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) // this should work with all brokers up and running http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3df4507..aba256d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -25,7 +25,7 @@ import org.junit.Test import org.junit.Assert._ import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness @@ -39,12 +39,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val numServers = 2 val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect) overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) - val configs = - for (props <- TestUtils.createBrokerConfigs(numServers, false)) - yield KafkaConfig.fromProps(props, overridingProps) + def generateConfigs() = + TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null @@ -56,8 +54,8 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { super.setUp() // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") } override def tearDown() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 8bc1785..99ac923 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -27,21 +27,7 @@ import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val brokerId4 = 3 - - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - val port3 = TestUtils.choosePort() - val port4 = TestUtils.choosePort() - - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false) - + var configs: Seq[KafkaConfig] = null var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var brokers: Seq[Broker] = Seq.empty[Broker] @@ -54,14 +40,11 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() - // start all the servers - val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1)) - val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2)) - val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3)) - val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4)) - servers ++= List(server1, server2, server3, server4) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false))) + // start all the servers + servers = configs.map(c => TestUtils.createServer(c)) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort)) // create topics first createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index ee0b21e..0305f70 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testReassigningNonExistingPartition() { val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions // create brokers - val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 val preferredReplica = 0 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps) + val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) @@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" val partition = 1 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps) + val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -365,7 +365,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testTopicConfigChange() { val partitions = 3 val topic = "my-topic" - val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))) + val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala index 1baff0e..1913ad6 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -17,7 +17,7 @@ package kafka.admin import org.scalatest.junit.JUnit3Suite -import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils} +import kafka.utils._ import kafka.server.KafkaConfig import org.junit.Test import kafka.consumer._ @@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness { - val configs = TestUtils.createBrokerConfigs(3, false, true).map(KafkaConfig.fromProps) + def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps) @Test def testGroupWideDeleteInZK() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 6258983..61cc602 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -96,7 +96,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4, false) + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) @@ -224,7 +224,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val topicAndPartition = TopicAndPartition(topicName, 0) val topic = topicAndPartition.topic - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) brokerConfigs(0).setProperty("delete.topic.enable", "true") brokerConfigs(0).setProperty("log.cleaner.enable","true") brokerConfigs(0).setProperty("log.cleanup.policy","compact") @@ -253,7 +253,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true") ) createTestTopicAndCluster(topic,brokerConfigs) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 995397b..bb25467 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -30,7 +30,6 @@ import kafka.utils.TestUtils._ import kafka.utils._ import org.junit.Test import kafka.serializer._ -import kafka.cluster.{Broker, Cluster} import org.scalatest.junit.JUnit3Suite import kafka.integration.KafkaServerTestHarness @@ -38,31 +37,27 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val numNodes = 1 - val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect) - - val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) - yield KafkaConfig.fromProps(props, overridingProps) + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) val messages = new mutable.HashMap[Int, Seq[Message]] val topic = "topic" val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val queue = new LinkedBlockingQueue[FetchedDataChunk] - val topicInfos = configs.map(c => new PartitionTopicInfo(topic, - 0, - queue, - new AtomicLong(consumedOffset), - new AtomicLong(0), - new AtomicInteger(0), - "")) - val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) + var topicInfos: Seq[PartitionTopicInfo] = null + + def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) override def setUp() { - super.setUp + super.setUp() + topicInfos = configs.map(c => new PartitionTopicInfo(topic, + 0, + queue, + new AtomicLong(consumedOffset), + new AtomicLong(0), + new AtomicInteger(0), + "")) createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 155fd04..f3e76db 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -38,17 +38,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val RebalanceBackoffMs = 5000 var dirs : ZKGroupTopicDirs = null - val zookeeperConnect = TestZKUtils.zookeeperConnect val numNodes = 2 val numParts = 2 val topic = "topic1" val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect) overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - val configs = - for (props <- TestUtils.createBrokerConfigs(numNodes)) - yield KafkaConfig.fromProps(props, overridingProps) + override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) + .map(KafkaConfig.fromProps(_, overridingProps)) val group = "group1" val consumer0 = "consumer0" @@ -93,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ + sendMessagesToPartition(servers, topic, 1, nMessages) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) @@ -127,8 +124,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ + sendMessagesToPartition(servers, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -148,8 +145,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ + sendMessagesToPartition(servers, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -182,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -215,8 +212,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -236,8 +233,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -258,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec) + val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++ + sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -284,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec) + val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++ + sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -319,13 +316,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def testLeaderSelectionForPartition() { - val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) + val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer) // create topic topic1 with 1 partition on broker 0 createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -351,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testConsumerRebalanceListener() { // Send messages to create topic - sendMessagesToPartition(configs, topic, 0, nMessages) - sendMessagesToPartition(configs, topic, 1, nMessages) + sendMessagesToPartition(servers, topic, 0, nMessages) + sendMessagesToPartition(servers, topic, 1, nMessages) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index ffa6c30..139dc9a 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -31,7 +31,7 @@ import junit.framework.Assert._ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { - val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0))) + def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) val topic = "test_topic" val group = "default_group" @@ -78,7 +78,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L TestUtils.createTopic(zkClient, topic, 1, 1, servers) val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromConfigs(configs), + TestUtils.getBrokerListStrFromServers(servers), keyEncoder = classOf[StringEncoder].getName) for(i <- 0 until numMessages) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 3093e45..ecb5a33 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -32,23 +32,13 @@ import kafka.utils.TestUtils._ import kafka.utils.TestUtils class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { - val numNodes = 1 - val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) - yield KafkaConfig.fromProps(props) + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) + val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) - val shutdown = ZookeeperConsumerConnector.shutdownCommand + val queue = new LinkedBlockingQueue[FetchedDataChunk] - val topicInfos = configs.map(c => new PartitionTopicInfo(topic, - 0, - queue, - new AtomicLong(0), - new AtomicLong(0), - new AtomicInteger(0), - "")) var fetcher: ConsumerFetcherManager = null @@ -56,8 +46,18 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { super.setUp createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) + val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))) + fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) fetcher.stopConnections() + val topicInfos = configs.map(c => + new PartitionTopicInfo(topic, + 0, + queue, + new AtomicLong(0), + new AtomicLong(0), + new AtomicInteger(0), + "")) fetcher.startConnections(topicInfos, cluster) } @@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { var count = 0 for(conf <- configs) { val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromConfigs(configs), + TestUtils.getBrokerListStrFromServers(servers), keyEncoder = classOf[StringEncoder].getName) val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 062790f..28e3122 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -24,28 +24,38 @@ import kafka.utils.{Utils, TestUtils} import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.common.KafkaException -import kafka.utils.TestUtils /** * A test harness that brings up some number of broker nodes */ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { - - val configs: List[KafkaConfig] + var instanceConfigs: Seq[KafkaConfig] = null var servers: Buffer[KafkaServer] = null var brokerList: String = null var alive: Array[Boolean] = null - + + /** + * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every + * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + */ + def generateConfigs(): Seq[KafkaConfig] + + def configs: Seq[KafkaConfig] = { + if (instanceConfigs == null) + instanceConfigs = generateConfigs() + instanceConfigs + } + def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) - def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",") + def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",") override def setUp() { super.setUp if(configs.size <= 0) - throw new KafkaException("Must suply at least one server config.") - brokerList = TestUtils.getBrokerListStrFromConfigs(configs) + throw new KafkaException("Must supply at least one server config.") servers = configs.map(TestUtils.createServer(_)).toBuffer + brokerList = TestUtils.getBrokerListStrFromServers(servers) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 30deaf4..f601d31 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -23,16 +23,13 @@ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} -import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.admin.AdminUtils import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties -import TestUtils._ /** * End to end tests of the primitive apis against a local server @@ -40,10 +37,7 @@ import TestUtils._ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - val port = TestUtils.choosePort() - val props = TestUtils.createBrokerConfig(0, port) - val config = KafkaConfig.fromProps(props) - val configs = List(config) + def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def testFetchRequestCanProperlySerialize() { val request = new FetchRequestBuilder() @@ -97,7 +91,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with props.put("compression.codec", "gzip") val stringProducer1 = TestUtils.createProducer[String, String]( - TestUtils.getBrokerListStrFromConfigs(configs), + TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -222,7 +216,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with props.put("request.required.acks", "0") val pipelinedProducer: Producer[String, String] = TestUtils.createProducer[String, String]( - TestUtils.getBrokerListStrFromConfigs(configs), + TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 108c2e7..4614a92 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -24,18 +24,17 @@ import kafka.utils.{StaticPartitioner, TestUtils} import kafka.serializer.StringEncoder trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { - val port: Int val host = "localhost" var producer: Producer[String, String] = null var consumer: SimpleConsumer = null override def setUp() { super.setUp - producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName) - consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") + consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "") } override def tearDown() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index f74e716..130b205 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -25,35 +25,18 @@ import kafka.utils.{Utils, TestUtils} import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val brokerId4 = 3 - - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - val port3 = TestUtils.choosePort() - val port4 = TestUtils.choosePort() - - // controlled.shutdown.enable is true by default - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) - - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] val partitionId = 0 + var servers: Seq[KafkaServer] = null override def setUp() { super.setUp() + // controlled.shutdown.enable is true by default + val configs = (0 until 4).map(i => TestUtils.createBrokerConfig(i, zkConnect)) + configs(3).put("controlled.shutdown.retry.backoff.ms", "100") + // start all the servers - val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1)) - val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2)) - val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3)) - val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4)) - - servers ++= List(server1, server2, server3, server4) + servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c))) } override def tearDown() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index a671af4..56b1b8c 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -31,14 +31,15 @@ import kafka.common.ErrorMapping import kafka.client.ClientUtils class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(1) - val configs = props.map(p => KafkaConfig.fromProps(p)) private var server1: KafkaServer = null - val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) + var brokers: Seq[Broker] = null override def setUp() { super.setUp() + val props = createBrokerConfigs(1, zkConnect) + val configs = props.map(KafkaConfig.fromProps) server1 = TestUtils.createServer(configs.head) + brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort())) } override def tearDown() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 8342cae..7c87b81 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.{DefaultEncoder, StringEncoder} +import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.Utils import kafka.utils.TestUtils._ @@ -39,20 +39,12 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 - val port1 = choosePort() - val port2 = choosePort() - // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to // reduce test execution time val enableControlledShutdown = true - val configProps1 = createBrokerConfig(brokerId1, port1) - val configProps2 = createBrokerConfig(brokerId2, port2) - for (configProps <- List(configProps1, configProps2)) { - configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) - configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) - configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) - } + var configProps1: Properties = null + var configProps2: Properties = null var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] @@ -69,6 +61,15 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() + configProps1 = createBrokerConfig(brokerId1, zkConnect) + configProps2 = createBrokerConfig(brokerId2, zkConnect) + + for (configProps <- List(configProps1, configProps2)) { + configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) + configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) + configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) + } + // temporarily set loggers to a higher level so that tests run quietly kafkaApisLogger.setLevel(Level.FATAL) networkProcessorLogger.setLevel(Level.FATAL) @@ -254,7 +255,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { private def produceMessage(topic: String, message: String) = { val producer: Producer[String, Array[Byte]] = createProducer( - getBrokerListStrFromConfigs(configs), + getBrokerListStrFromServers(servers), keyEncoder = classOf[StringEncoder].getName) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) producer.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 3d0fc9d..ad66bb2 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -39,19 +39,14 @@ import junit.framework.Assert._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { - - val zookeeperConnect = zkConnect val numNodes = 2 val numParts = 2 val topic = "topic1" val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect) overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - val configs = - for (props <- TestUtils.createBrokerConfigs(numNodes)) - yield KafkaConfig.fromProps(props, overridingProps) + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) val group = "group1" val consumer1 = "consumer1" @@ -68,7 +63,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val sentMessages1 = sendMessages(nMessages, "batch1") // create a consumer - val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder()) @@ -93,7 +88,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compressed: CompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = - TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 3c0599c..ac4c5b9 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -37,7 +37,7 @@ class LogTest extends JUnitSuite { @Before def setUp() { logDir = TestUtils.tempDir() - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1) config = KafkaConfig.fromProps(props) }