Repository: kafka Updated Branches: refs/heads/trunk 15b93a410 -> 6adaffd8e
http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 36db917..18361c1 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -47,19 +47,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with private val brokerZk = 0 - private val ports = TestUtils.choosePorts(2) - private val portZk = ports(0) - @Before override def setUp() { super.setUp() - val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk) + val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect) val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = KafkaConfig.fromProps(propsZk) server = TestUtils.createServer(config) - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 * 1024, "") } @After @@ -94,7 +91,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") try { @@ -129,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.RequiredNumAcks", "1") props.put("log4j.appender.KAFKA.SyncSend", "true") http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 0f58ad8..247a6e9 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -36,18 +36,15 @@ import scala.util.matching.Regex import org.scalatest.junit.JUnit3Suite class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { - val zookeeperConnect = TestZKUtils.zookeeperConnect val numNodes = 2 val numParts = 2 val topic = "topic1" val overridingProps = new Properties() - overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect) overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) - val configs = - for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic = true)) - yield KafkaConfig.fromProps(props, overridingProps) + def generateConfigs() = + TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps)) val nMessages = 2 @@ -80,7 +77,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { } def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 0af23ab..79a806c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -36,7 +36,7 @@ class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, host = null, - port = kafka.utils.TestUtils.choosePort, + port = 0, numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, @@ -73,7 +73,7 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect(s:SocketServer = server) = new Socket("localhost", s.port) + def connect(s:SocketServer = server) = new Socket("localhost", s.boundPort) @After def cleanup() { @@ -162,7 +162,7 @@ class SocketServerTest extends JUnitSuite { val overrides: Map[String, Int] = Map("localhost" -> overrideNum) val overrideServer: SocketServer = new SocketServer(0, host = null, - port = kafka.utils.TestUtils.choosePort, + port = 0, numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index be90c5b..d2ab683 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -36,8 +36,10 @@ import scala.collection.mutable.ArrayBuffer import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { - val props = createBrokerConfigs(1) - val configs = props.map(p => KafkaConfig.fromProps(p)) + // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks + val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534)) + val configs = props.map(KafkaConfig.fromProps) + val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",") override def setUp() { super.setUp() @@ -61,7 +63,7 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) props.put("producer.type", "async") props.put("queue.buffering.max.messages", "10") props.put("batch.num.messages", "1") @@ -86,7 +88,7 @@ class AsyncProducerTest extends JUnit3Suite { def testProduceAfterClosed() { val produceData = getProduceData(10) val producer = createProducer[String, String]( - getBrokerListStrFromConfigs(configs), + brokerList, encoder = classOf[StringEncoder].getName) producer.close @@ -162,7 +164,7 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes))) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) @@ -212,7 +214,7 @@ class AsyncProducerTest extends JUnit3Suite { def testSerializeEvents() { val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m)) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) val config = new ProducerConfig(props) // form expected partitions metadata val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) @@ -244,7 +246,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]] producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes))) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) val config = new ProducerConfig(props) // form expected partitions metadata @@ -274,7 +276,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testNoBroker() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -308,7 +310,7 @@ class AsyncProducerTest extends JUnit3Suite { // no need to retry since the send will always fail props.put("message.send.max.retries", "0") val producer= createProducer[String, String]( - brokerList = getBrokerListStrFromConfigs(configs), + brokerList = brokerList, encoder = classOf[DefaultEncoder].getName, keyEncoder = classOf[DefaultEncoder].getName, producerProps = props) @@ -326,7 +328,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testRandomPartitioner() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -364,7 +366,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testFailedSendRetryLogic() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", brokerList) props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index d2f3851..a7ca142 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -17,7 +17,6 @@ package kafka.producer -import org.apache.kafka.common.config.ConfigException import org.scalatest.TestFailedException import org.scalatest.junit.JUnit3Suite import kafka.consumer.SimpleConsumer @@ -40,8 +39,6 @@ import kafka.serializer.StringEncoder class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val brokerId1 = 0 private val brokerId2 = 1 - private val ports = TestUtils.choosePorts(2) - private val (port1, port2) = (ports(0), ports(1)) private var server1: KafkaServer = null private var server2: KafkaServer = null private var consumer1: SimpleConsumer = null @@ -49,26 +46,36 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) private var servers = List.empty[KafkaServer] - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - props1.put("num.partitions", "4") - private val config1 = KafkaConfig.fromProps(props1) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) - props2.put("num.partitions", "4") - private val config2 = KafkaConfig.fromProps(props2) + // Creation of consumers is deferred until they are actually needed. This allows us to kill brokers that use random + // ports and then get a consumer instance that will be pointed at the correct port + def getConsumer1() = { + if (consumer1 == null) + consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 1000000, 64*1024, "") + consumer1 + } + + def getConsumer2() = { + if (consumer2 == null) + consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 64*1024, "") + consumer2 + } override def setUp() { super.setUp() // set up 2 brokers with 4 partitions each + val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false) + props1.put("num.partitions", "4") + val config1 = KafkaConfig.fromProps(props1) + val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false) + props2.put("num.partitions", "4") + val config2 = KafkaConfig.fromProps(props2) server1 = TestUtils.createServer(config1) server2 = TestUtils.createServer(config2) servers = List(server1,server2) val props = new Properties() props.put("host", "localhost") - props.put("port", port1.toString) - - consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "") - consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "") + props.put("port", server1.boundPort().toString) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) @@ -115,7 +122,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val producer2 = TestUtils.createProducer[String, String]( - brokerList = "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq(config1)), + brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) @@ -128,7 +135,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val producer3 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) @@ -151,7 +158,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val producer1 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -166,10 +173,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val leader = leaderOpt.get val messageSet = if(leader == server1.config.brokerId) { - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) response1.messageSet("new-topic", 0).iterator.toBuffer }else { - val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val response2 = getConsumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) response2.messageSet("new-topic", 0).iterator.toBuffer } assertEquals("Should have fetched 2 messages", 2, messageSet.size) @@ -184,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try { val producer2 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -214,7 +221,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ servers = servers) val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -248,7 +255,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try { // cross check if broker 1 got the messages - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) val messageSet1 = response1.messageSet(topic, 0).iterator assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message) @@ -268,7 +275,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("message.send.max.retries", "0") props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -283,7 +290,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // this message should be assigned to partition 0 whose leader is on broker 0 producer.send(new KeyedMessage[String, String](topic, "test", "test")) // cross check if brokers got the messages - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) val messageSet1 = response1.messageSet("new-topic", 0).iterator assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message("test".getBytes), messageSet1.next.message) @@ -315,7 +322,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendNullMessage() { val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 812df59..7d6f655 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -33,13 +33,12 @@ import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private val messageBytes = new Array[Byte](2) // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. - val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head)) - val zookeeperConnect = TestZKUtils.zookeeperConnect + def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head)) @Test def testReachableServer() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds @@ -74,7 +73,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId @@ -91,7 +90,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testMessageSizeTooLarge() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val producer = new SyncProducer(new SyncProducerConfig(props)) TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers) @@ -119,7 +118,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { def testMessageSizeTooLargeWithAckZero() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) props.put("request.required.acks", "0") val producer = new SyncProducer(new SyncProducerConfig(props)) @@ -145,7 +144,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -191,7 +190,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -217,7 +216,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceRequestWithNoResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -233,7 +232,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val topicName = "minisrtest" val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) props.put("request.required.acks", "-1") val producer = new SyncProducer(new SyncProducerConfig(props)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index 296e2b5..b011240 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -30,7 +30,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() - val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) props.put("advertised.host.name", advertisedHostName) props.put("advertised.port", advertisedPort.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 93182ae..7877f6c 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -26,8 +26,7 @@ import kafka.admin.{AdminOperationException, AdminUtils} import org.scalatest.junit.JUnit3Suite class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { - - override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @Test def testConfigChange() { http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 0bdbc2f..142e28e 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps) + val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val topic = "foo" val logManagers = configs map { config => TestUtils.createLogManager( http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index c1d168a..90529fa 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -39,7 +39,7 @@ class IsrExpirationTest extends JUnit3Suite { val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) + val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps)) val topic = "foo" val time = new MockTime http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 7f47e6f..852fa3b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -29,7 +29,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.retention.hours", "1") val cfg = KafkaConfig.fromProps(props) @@ -39,7 +39,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeMinutesProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.retention.minutes", "30") val cfg = KafkaConfig.fromProps(props) @@ -49,7 +49,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.retention.ms", "1800000") val cfg = KafkaConfig.fromProps(props) @@ -59,7 +59,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val cfg = KafkaConfig.fromProps(props) assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis) @@ -68,7 +68,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeBothMinutesAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.retention.minutes", "30") props.put("log.retention.hours", "1") @@ -79,7 +79,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeBothMinutesAndMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.retention.ms", "1800000") props.put("log.retention.minutes", "10") @@ -93,7 +93,7 @@ class KafkaConfigTest extends JUnit3Suite { val port = 9999 val hostName = "fake-host" - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = port) props.put("host.name", hostName) val serverConfig = KafkaConfig.fromProps(props) @@ -108,7 +108,7 @@ class KafkaConfigTest extends JUnit3Suite { val advertisedHostName = "routable-host" val advertisedPort = 1234 - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = port) props.put("advertised.host.name", advertisedHostName) props.put("advertised.port", advertisedPort.toString) @@ -120,7 +120,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanLeaderElectionDefault() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val serverConfig = KafkaConfig.fromProps(props) assertEquals(serverConfig.uncleanLeaderElectionEnable, true) @@ -128,7 +128,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionDisabled() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("unclean.leader.election.enable", String.valueOf(false)) val serverConfig = KafkaConfig.fromProps(props) @@ -137,7 +137,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionEnabled() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("unclean.leader.election.enable", String.valueOf(true)) val serverConfig = KafkaConfig.fromProps(props) @@ -146,7 +146,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionInvalid() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("unclean.leader.election.enable", "invalid") intercept[ConfigException] { @@ -156,7 +156,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.roll.ms", "1800000") val cfg = KafkaConfig.fromProps(props) @@ -166,7 +166,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeBothMsAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("log.roll.ms", "1800000") props.put("log.roll.hours", "1") @@ -177,7 +177,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val cfg = KafkaConfig.fromProps(props) assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis ) @@ -186,7 +186,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testDefaultCompressionType() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val serverConfig = KafkaConfig.fromProps(props) assertEquals(serverConfig.compressionType, "producer") @@ -194,7 +194,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testValidCompressionType() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("compression.type", "gzip") val serverConfig = KafkaConfig.fromProps(props) @@ -203,7 +203,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testInvalidCompressionType() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put("compression.type", "abc") intercept[IllegalArgumentException] { KafkaConfig.fromProps(props) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index f252805..3d4258f 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -31,17 +31,16 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var staleControllerEpochDetected = false override def setUp() { super.setUp() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false) + // start both servers val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1)) val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2)) @@ -117,8 +116,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerId = 2 - val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) + val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect)) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())) val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 8c9f9e7..496bf0d 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -39,19 +39,18 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { var topicLogDir: File = null var server: KafkaServer = null var logSize: Int = 100 - val brokerPort: Int = 9099 var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, brokerPort) + val config: Properties = createBrokerConfig(1) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() server = TestUtils.createServer(KafkaConfig.fromProps(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "") + simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "") } @After @@ -194,10 +193,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(Seq(0L), consumerOffsets) } - private def createBrokerConfig(nodeId: Int, port: Int): Properties = { + private def createBrokerConfig(nodeId: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) - props.put("port", port.toString) + props.put("port", TestUtils.RandomPort.toString()) props.put("log.dir", getLogDir.getAbsolutePath) props.put("log.flush.interval.messages", "1") props.put("enable.zookeeper", "false") http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index de255e3..92e49df 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,8 +19,7 @@ package kafka.server import java.util.Properties import kafka.utils.TestUtils._ -import kafka.utils.IntEncoder -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{IntEncoder, Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ import kafka.producer.{KeyedMessage, Producer} @@ -43,38 +42,48 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) - val configs = TestUtils.createBrokerConfigs(2, false).map(KafkaConfig.fromProps(_, overridingProps)) + var configs: Seq[KafkaConfig] = null val topic = "new-topic" val partitionId = 0 var server1: KafkaServer = null var server2: KafkaServer = null - val configProps1 = configs.head - val configProps2 = configs.last + def configProps1 = configs.head + def configProps2 = configs.last val message = "hello" var producer: Producer[Int, String] = null - var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) - var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) + def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) + def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need + // to use a new producer that knows the new ports + def updateProducer() = { + if (producer != null) + producer.close() + producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) + } + override def setUp() { super.setUp() + configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) + // start both servers server1 = TestUtils.createServer(configProps1) server2 = TestUtils.createServer(configProps2) - servers ++= List(server1, server2) + servers = List(server1, server2) // create topic with 1 partition, 2 replicas, one on each broker createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) // create the producer - producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) + updateProducer() } override def tearDown() { @@ -121,6 +130,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // bring the preferred replica back server1.startup() + // Update producer with new server settings + updateProducer() leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0", @@ -132,6 +143,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) server2.startup() + updateProducer() leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader) assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1", leader.isDefined && (leader.get == 0 || leader.get == 1)) @@ -181,6 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) server2.startup() + updateProducer() // check if leader moves to the other server leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader) assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) @@ -189,6 +202,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // bring the preferred replica back server1.startup() + updateProducer() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 7654275..a6bb690 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -37,7 +37,6 @@ import junit.framework.Assert._ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val random: Random = new Random() - val brokerPort: Int = 9099 val group = "test-group" val retentionCheckInterval: Long = 100L var logDir: File = null @@ -50,14 +49,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, brokerPort) + val config: Properties = createBrokerConfig(1, zkConnect) config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() server = TestUtils.createServer(KafkaConfig.fromProps(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client") + simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") val consumerMetadataRequest = ConsumerMetadataRequest(group) Stream.continually { val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 1e64faf..a67cc37 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -22,20 +22,19 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.utils.TestUtils -import junit.framework.Assert._ +import kafka.utils.{TestUtils} import kafka.common._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(2,false) - val configs = props.map(p => KafkaConfig.fromProps(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" override def setUp() { super.setUp() - brokers = configs.map(config => TestUtils.createServer(config)) + brokers = createBrokerConfigs(2, zkConnect, false) + .map(KafkaConfig.fromProps) + .map(config => TestUtils.createServer(config)) } override def tearDown() { @@ -54,7 +53,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2849a5e..00d5933 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -38,7 +38,7 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testHighWaterMarkDirectoryMapping() { - val props = TestUtils.createBrokerConfig(1) + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) @@ -54,7 +54,7 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkRelativeDirectoryMapping() { - val props = TestUtils.createBrokerConfig(1) + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) @@ -71,7 +71,7 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testIllegalRequiredAcks() { - val props = TestUtils.createBrokerConfig(1) + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 96a8a5a..2bfaeb3 100644 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -16,6 +16,8 @@ */ package kafka.server +import java.util.Properties + import kafka.zk.ZooKeeperTestHarness import kafka.utils.{TestUtils, Utils} import org.junit.Test @@ -24,12 +26,19 @@ import junit.framework.Assert._ import java.io.File class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { - var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) - var config1 = KafkaConfig.fromProps(props1) - var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort) - var config2 = KafkaConfig.fromProps(props2) + var props1: Properties = null + var config1: KafkaConfig = null + var props2: Properties = null + var config2: KafkaConfig = null val brokerMetaPropsFile = "meta.properties" + override def setUp() { + super.setUp() + props1 = TestUtils.createBrokerConfig(-1, zkConnect) + config1 = KafkaConfig.fromProps(props1) + props2 = TestUtils.createBrokerConfig(0, zkConnect) + config2 = KafkaConfig.fromProps(props2) + } @Test def testAutoGenerateBrokerId() { @@ -51,7 +60,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { // start the server with broker.id as part of config val server1 = new KafkaServer(config1) val server2 = new KafkaServer(config2) - val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + val props3 = TestUtils.createBrokerConfig(-1, zkConnect) val config3 = KafkaConfig.fromProps(props3) val server3 = new KafkaServer(config3) server1.startup() http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 71317eb..a20321f 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -32,20 +32,23 @@ import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { - val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) - val config = KafkaConfig.fromProps(props) - + var config: KafkaConfig = null val host = "localhost" val topic = "test" val sent1 = List("hello", "there") val sent2 = List("more", "messages") + override def setUp(): Unit = { + super.setUp() + val props = TestUtils.createBrokerConfig(0, zkConnect) + config = KafkaConfig.fromProps(props) + } + @Test def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) @@ -71,10 +74,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for the broker to receive the update metadata request after startup TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) - producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) - val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") + val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "") var fetchedMessage: ByteBufferMessageSet = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { @@ -103,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testCleanShutdownWithDeleteTopicEnabled() { - val newProps = TestUtils.createBrokerConfig(0, port) + val newProps = TestUtils.createBrokerConfig(0, zkConnect) newProps.setProperty("delete.topic.enable", "true") val newConfig = KafkaConfig.fromProps(newProps) val server = new KafkaServer(newConfig) @@ -116,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testCleanShutdownAfterFailedStartup() { - val newProps = TestUtils.createBrokerConfig(0, port) + val newProps = TestUtils.createBrokerConfig(0, zkConnect) newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535") val newConfig = KafkaConfig.fromProps(newProps) val server = new KafkaServer(newConfig) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 60021ef..661ddd5 100644 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -30,7 +30,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { def testBrokerCreatesZKChroot { val brokerId = 0 val zookeeperChroot = "/kafka-chroot-for-unittest" - val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) val zooKeeperConnect = props.get("zookeeper.connect") props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) val server = TestUtils.createServer(KafkaConfig.fromProps(props)) @@ -47,11 +47,11 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { // This shouldn't affect the existing broker registration. val brokerId = 0 - val props1 = TestUtils.createBrokerConfig(brokerId) + val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect) val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1)) val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 - val props2 = TestUtils.createBrokerConfig(brokerId) + val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect) try { TestUtils.createServer(KafkaConfig.fromProps(props2)) fail("Registering a broker with a conflicting id should fail") http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 519888e..09a0961 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -43,7 +43,7 @@ class SimpleFetchTest extends JUnit3Suite { overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) + val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps)) // set the replica manager with the partition val time = new MockTime http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 2edc814..c96c0ff 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -53,7 +53,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testUpdateLeaderAndIsr() { - val configs = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps) + val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() EasyMock.expect(log) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bb4daad..f451825 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -65,23 +65,13 @@ object TestUtils extends Logging { val seededRandom = new Random(192348092834L) val random = new Random() - /** - * Choose a number of random available ports - */ - def choosePorts(count: Int): List[Int] = { - val sockets = - for(i <- 0 until count) - yield new ServerSocket(0) - val socketList = sockets.toList - val ports = socketList.map(_.getLocalPort) - socketList.map(_.close) - ports - } + /* 0 gives a random port; you can then retrieve the assigned port from the Socket object. */ + val RandomPort = 0 - /** - * Choose an available port - */ - def choosePort(): Int = choosePorts(1).head + /** Port to use for unit tests that mock/don't require a real ZK server. */ + val MockZkPort = 1 + /** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */ + val MockZkConnect = "127.0.0.1:" + MockZkPort /** * Create a temporary directory @@ -141,28 +131,29 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfigs(numConfigs: Int, + zkConnect: String, enableControlledShutdown: Boolean = true, - enableDeleteTopic: Boolean = false): List[Properties] = { - for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic) + enableDeleteTopic: Boolean = false): Seq[Properties] = { + (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic)) } - def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { - configs.map(c => formatAddress(c.hostName, c.port)).mkString(",") + def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = { + servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",") } /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int = choosePort(), + def createBrokerConfig(nodeId: Int, zkConnect: String, enableControlledShutdown: Boolean = true, - enableDeleteTopic: Boolean = false): Properties = { + enableDeleteTopic: Boolean = false, + port: Int = RandomPort): Properties = { val props = new Properties if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) + props.put("zookeeper.connect", zkConnect) props.put("replica.socket.timeout.ms", "1500") props.put("controller.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) @@ -756,7 +747,7 @@ object TestUtils extends Logging { brokerState = new BrokerState()) } - def sendMessagesToPartition(configs: Seq[KafkaConfig], + def sendMessagesToPartition(servers: Seq[KafkaServer], topic: String, partition: Int, numMessages: Int, @@ -765,7 +756,7 @@ object TestUtils extends Logging { val props = new Properties() props.put("compression.codec", compression.codec.toString) val producer: Producer[Int, String] = - createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + createProducer(TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName, partitioner = classOf[FixedValuePartitioner].getName, @@ -778,7 +769,7 @@ object TestUtils extends Logging { ms.toList } - def sendMessages(configs: Seq[KafkaConfig], + def sendMessages(servers: Seq[KafkaServer], topic: String, producerId: String, messagesPerNode: Int, @@ -790,7 +781,7 @@ object TestUtils extends Logging { props.put("compression.codec", compression.codec.toString) props.put("client.id", producerId) val producer: Producer[Int, String] = - createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName, partitioner = classOf[FixedValuePartitioner].getName, @@ -848,10 +839,6 @@ object TestUtils extends Logging { } -object TestZKUtils { - val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort() -} - class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { override def toBytes(n: Int) = n.toString.getBytes } http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 3151561..1d87506 100644 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -24,14 +24,16 @@ import java.net.InetSocketAddress import kafka.utils.Utils import org.apache.kafka.common.utils.Utils.getPort -class EmbeddedZookeeper(val connectString: String) { +class EmbeddedZookeeper() { val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() val tickTime = 500 val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0) + private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort) + factory.configure(addr, 0) factory.startup(zookeeper) + val port = zookeeper.getClientPort() def shutdown() { Utils.swallow(zookeeper.shutdown()) http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index 9897b2f..1bc45b1 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -29,7 +29,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { val path: String = "/some_dir" val zkSessionTimeoutMs = 1000 - val zkConnectWithInvalidRoot: String = zkConnect + "/ghost" + def zkConnectWithInvalidRoot: String = zkConnect + "/ghost" def testCreatePersistentPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 67d9c4b..fedefb5 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,19 +19,22 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils} +import kafka.utils.{ZKStringSerializer, Utils} trait ZooKeeperTestHarness extends JUnit3Suite { - val zkConnect: String = TestZKUtils.zookeeperConnect + var zkPort: Int = -1 var zookeeper: EmbeddedZookeeper = null var zkClient: ZkClient = null val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 + def zkConnect: String = "127.0.0.1:" + zkPort + override def setUp() { super.setUp - zookeeper = new EmbeddedZookeeper(zkConnect) - zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + zookeeper = new EmbeddedZookeeper() + zkPort = zookeeper.port + zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) } override def tearDown() {