Repository: kafka
Updated Branches:
  refs/heads/trunk 15b93a410 -> 6adaffd8e


http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 
b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 36db917..18361c1 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -47,19 +47,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with 
ZooKeeperTestHarness with
 
   private val brokerZk = 0
 
-  private val ports = TestUtils.choosePorts(2)
-  private val portZk = ports(0)
-
   @Before
   override def setUp() {
     super.setUp()
 
-    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
+    val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect)
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     config = KafkaConfig.fromProps(propsZk)
     server = TestUtils.createServer(config)
-    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 
1024, "")
+    simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 
1000000, 64 * 1024, "")
   }
 
   @After
@@ -94,7 +91,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with 
ZooKeeperTestHarness with
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - 
%m%n")
-    props.put("log4j.appender.KAFKA.brokerList", 
TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put("log4j.appender.KAFKA.brokerList", 
TestUtils.getBrokerListStrFromServers(Seq(server)))
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     try {
@@ -129,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with 
ZooKeeperTestHarness with
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - 
%m%n")
-    props.put("log4j.appender.KAFKA.BrokerList", 
TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put("log4j.appender.KAFKA.BrokerList", 
TestUtils.getBrokerListStrFromServers(Seq(server)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
     props.put("log4j.appender.KAFKA.SyncSend", "true")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 0f58ad8..247a6e9 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -36,18 +36,15 @@ import scala.util.matching.Regex
 import org.scalatest.junit.JUnit3Suite
 
 class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging 
{
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
 
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  val configs =
-    for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic = 
true))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numNodes, zkConnect, 
enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
 
   val nMessages = 2
 
@@ -80,7 +77,7 @@ class MetricsTest extends JUnit3Suite with 
KafkaServerTestHarness with Logging {
   }
 
   def createAndShutdownStep(group: String, consumerId: String, producerId: 
String): Unit = {
-    val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, 
"batch1", NoCompressionCodec, 1)
+    val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, 
"batch1", NoCompressionCodec, 1)
     // create a consumer
     val consumerConfig1 = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, 
true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 0af23ab..79a806c 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -36,7 +36,7 @@ class SocketServerTest extends JUnitSuite {
 
   val server: SocketServer = new SocketServer(0,
                                               host = null,
-                                              port = 
kafka.utils.TestUtils.choosePort,
+                                              port = 0,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
                                               sendBufferSize = 300000,
@@ -73,7 +73,7 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request.processor, 
request, send))
   }
 
-  def connect(s:SocketServer = server) = new Socket("localhost", s.port)
+  def connect(s:SocketServer = server) = new Socket("localhost", s.boundPort)
 
   @After
   def cleanup() {
@@ -162,7 +162,7 @@ class SocketServerTest extends JUnitSuite {
     val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
     val overrideServer: SocketServer = new SocketServer(0,
                                                 host = null,
-                                                port = 
kafka.utils.TestUtils.choosePort,
+                                                port = 0,
                                                 numProcessorThreads = 1,
                                                 maxQueuedRequests = 50,
                                                 sendBufferSize = 300000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index be90c5b..d2ab683 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -36,8 +36,10 @@ import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite {
-  val props = createBrokerConfigs(1)
-  val configs = props.map(p => KafkaConfig.fromProps(p))
+  // One of the few cases we can just set a fixed port because the producer is 
mocked out here since this uses mocks
+  val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
+  val configs = props.map(KafkaConfig.fromProps)
+  val brokerList = configs.map(c => 
org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, 
c.port)).mkString(",")
 
   override def setUp() {
     super.setUp()
@@ -61,7 +63,7 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     props.put("producer.type", "async")
     props.put("queue.buffering.max.messages", "10")
     props.put("batch.num.messages", "1")
@@ -86,7 +88,7 @@ class AsyncProducerTest extends JUnit3Suite {
   def testProduceAfterClosed() {
     val produceData = getProduceData(10)
     val producer = createProducer[String, String](
-      getBrokerListStrFromConfigs(configs),
+      brokerList,
       encoder = classOf[StringEncoder].getName)
 
     producer.close
@@ -162,7 +164,7 @@ class AsyncProducerTest extends JUnit3Suite {
     producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, 
message = new Message("msg5".getBytes)))
 
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     val broker1 = new Broker(0, "localhost", 9092)
     val broker2 = new Broker(1, "localhost", 9093)
 
@@ -212,7 +214,7 @@ class AsyncProducerTest extends JUnit3Suite {
   def testSerializeEvents() {
     val produceData = TestUtils.getMsgStrings(5).map(m => new 
KeyedMessage[String,String]("topic1",m))
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     val config = new ProducerConfig(props)
     // form expected partitions metadata
     val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
@@ -244,7 +246,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", 
new Message("msg1".getBytes)))
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     val config = new ProducerConfig(props)
 
     // form expected partitions metadata
@@ -274,7 +276,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testNoBroker() {
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
 
     val config = new ProducerConfig(props)
     // create topic metadata with 0 partitions
@@ -308,7 +310,7 @@ class AsyncProducerTest extends JUnit3Suite {
     // no need to retry since the send will always fail
     props.put("message.send.max.retries", "0")
     val producer= createProducer[String, String](
-      brokerList = getBrokerListStrFromConfigs(configs),
+      brokerList = brokerList,
       encoder = classOf[DefaultEncoder].getName,
       keyEncoder = classOf[DefaultEncoder].getName,
       producerProps = props)
@@ -326,7 +328,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testRandomPartitioner() {
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     val config = new ProducerConfig(props)
 
     // create topic metadata with 0 partitions
@@ -364,7 +366,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
-    props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", brokerList)
     props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props.put("key.serializer.class", 
classOf[NullEncoder[Int]].getName.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index d2f3851..a7ca142 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.producer
 
-import org.apache.kafka.common.config.ConfigException
 import org.scalatest.TestFailedException
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer.SimpleConsumer
@@ -40,8 +39,6 @@ import kafka.serializer.StringEncoder
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
   private val brokerId2 = 1
-  private val ports = TestUtils.choosePorts(2)
-  private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
   private var server2: KafkaServer = null
   private var consumer1: SimpleConsumer = null
@@ -49,26 +46,36 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
   private val requestHandlerLogger = 
Logger.getLogger(classOf[KafkaRequestHandler])
   private var servers = List.empty[KafkaServer]
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  props1.put("num.partitions", "4")
-  private val config1 = KafkaConfig.fromProps(props1)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
-  props2.put("num.partitions", "4")
-  private val config2 = KafkaConfig.fromProps(props2)
+  // Creation of consumers is deferred until they are actually needed. This 
allows us to kill brokers that use random
+  // ports and then get a consumer instance that will be pointed at the 
correct port
+  def getConsumer1() = {
+    if (consumer1 == null)
+      consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 
1000000, 64*1024, "")
+    consumer1
+  }
+
+  def getConsumer2() = {
+    if (consumer2 == null)
+      consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 
64*1024, "")
+    consumer2
+  }
 
   override def setUp() {
     super.setUp()
     // set up 2 brokers with 4 partitions each
+    val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false)
+    props1.put("num.partitions", "4")
+    val config1 = KafkaConfig.fromProps(props1)
+    val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false)
+    props2.put("num.partitions", "4")
+    val config2 = KafkaConfig.fromProps(props2)
     server1 = TestUtils.createServer(config1)
     server2 = TestUtils.createServer(config2)
     servers = List(server1,server2)
 
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", port1.toString)
-
-    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
-    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
+    props.put("port", server1.boundPort().toString)
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
@@ -115,7 +122,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     }
 
     val producer2 = TestUtils.createProducer[String, String](
-      brokerList = "localhost:80," + 
TestUtils.getBrokerListStrFromConfigs(Seq(config1)),
+      brokerList = "localhost:80," + 
TestUtils.getBrokerListStrFromServers(Seq(server1)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName)
 
@@ -128,7 +135,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     }
 
     val producer3 =  TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName)
 
@@ -151,7 +158,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     TestUtils.createTopic(zkClient, topic, numPartitions = 1, 
replicationFactor = 2, servers = servers)
 
     val producer1 = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
@@ -166,10 +173,10 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     val leader = leaderOpt.get
 
     val messageSet = if(leader == server1.config.brokerId) {
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val response1 = getConsumer1().fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       response1.messageSet("new-topic", 0).iterator.toBuffer
     }else {
-      val response2 = consumer2.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val response2 = getConsumer2().fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       response2.messageSet("new-topic", 0).iterator.toBuffer
     }
     assertEquals("Should have fetched 2 messages", 2, messageSet.size)
@@ -184,7 +191,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
 
     try {
       val producer2 = TestUtils.createProducer[String, String](
-        brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+        brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[StringEncoder].getName,
         partitioner = classOf[StaticPartitioner].getName,
@@ -214,7 +221,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
                           servers = servers)
 
     val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
@@ -248,7 +255,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
 
     try {
       // cross check if broker 1 got the messages
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val response1 = getConsumer1().fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet(topic, 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message(bytes = "test1".getBytes, key = 
"test".getBytes), messageSet1.next.message)
@@ -268,7 +275,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     props.put("message.send.max.retries", "0")
     
props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
     val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
@@ -283,7 +290,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
       // this message should be assigned to partition 0 whose leader is on 
broker 0
       producer.send(new KeyedMessage[String, String](topic, "test", "test"))
       // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val response1 = getConsumer1().fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message("test".getBytes), messageSet1.next.message)
@@ -315,7 +322,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
   @Test
   def testSendNullMessage() {
     val producer = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, 
config2)),
+      brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, 
server2)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 812df59..7d6f655 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -33,13 +33,12 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private val messageBytes =  new Array[Byte](2)
   // turning off controlled shutdown since testProducerCanTimeout() explicitly 
shuts down request handler pool.
-  val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, 
false).head))
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  def generateConfigs() = 
List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, 
false).head))
 
   @Test
   def testReachableServer() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val firstStart = SystemTime.milliseconds
@@ -74,7 +73,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testEmptyProduceRequest() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
 
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
@@ -91,7 +90,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testMessageSizeTooLarge() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     TestUtils.createTopic(zkClient, "test", numPartitions = 1, 
replicationFactor = 1, servers = servers)
@@ -119,7 +118,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   def testMessageSizeTooLargeWithAckZero() {
     val server = servers.head
 
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     props.put("request.required.acks", "0")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
@@ -145,7 +144,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -191,7 +190,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
     val timeoutMs = 500
 
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -217,7 +216,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testProduceRequestWithNoResponse() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -233,7 +232,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
     val topicName = "minisrtest"
     val server = servers.head
 
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     props.put("request.required.acks", "-1")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 296e2b5..b011240 100644
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -30,7 +30,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   override def setUp() {
     super.setUp()
-    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 93182ae..7877f6c 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,8 +26,7 @@ import kafka.admin.{AdminOperationException, AdminUtils}
 import org.scalatest.junit.JUnit3Suite
 
 class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
-  
-  override val configs = 
List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
TestUtils.choosePort)))
+  def generateConfigs() = 
List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   @Test
   def testConfigChange() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 0bdbc2f..142e28e 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps)
+  val configs = TestUtils.createBrokerConfigs(2, 
TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
   val topic = "foo"
   val logManagers = configs map { config =>
     TestUtils.createLogManager(

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index c1d168a..90529fa 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -39,7 +39,7 @@ class IsrExpirationTest extends JUnit3Suite {
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 
replicaLagTimeMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, 
replicaFetchWaitMaxMs.toString)
-  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, 
overridingProps))
+  val configs = TestUtils.createBrokerConfigs(2, 
TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
   val topic = "foo"
 
   val time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7f47e6f..852fa3b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -29,7 +29,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testLogRetentionTimeHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.retention.hours", "1")
 
     val cfg = KafkaConfig.fromProps(props)
@@ -39,7 +39,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeMinutesProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.retention.minutes", "30")
 
     val cfg = KafkaConfig.fromProps(props)
@@ -49,7 +49,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.retention.ms", "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
@@ -59,7 +59,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeNoConfigProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -68,7 +68,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.retention.minutes", "30")
     props.put("log.retention.hours", "1")
 
@@ -79,7 +79,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.retention.ms", "1800000")
     props.put("log.retention.minutes", "10")
 
@@ -93,7 +93,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val port = 9999
     val hostName = "fake-host"
     
-    val props = TestUtils.createBrokerConfig(0, port)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= port)
     props.put("host.name", hostName)
     
     val serverConfig = KafkaConfig.fromProps(props)
@@ -108,7 +108,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val advertisedHostName = "routable-host"
     val advertisedPort = 1234
     
-    val props = TestUtils.createBrokerConfig(0, port)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= port)
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
     
@@ -120,7 +120,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanLeaderElectionDefault() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@@ -128,7 +128,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionDisabled() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("unclean.leader.election.enable", String.valueOf(false))
     val serverConfig = KafkaConfig.fromProps(props)
 
@@ -137,7 +137,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionEnabled() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("unclean.leader.election.enable", String.valueOf(true))
     val serverConfig = KafkaConfig.fromProps(props)
 
@@ -146,7 +146,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionInvalid() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("unclean.leader.election.enable", "invalid")
 
     intercept[ConfigException] {
@@ -156,7 +156,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRollTimeMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.roll.ms", "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
@@ -166,7 +166,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRollTimeBothMsAndHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("log.roll.ms", "1800000")
     props.put("log.roll.hours", "1")
 
@@ -177,7 +177,7 @@ class KafkaConfigTest extends JUnit3Suite {
     
   @Test
   def testLogRollTimeNoConfigProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis             
                                                                                
                                                                                
                        )
@@ -186,7 +186,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testDefaultCompressionType() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.compressionType, "producer")
@@ -194,7 +194,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testValidCompressionType() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("compression.type", "gzip")
     val serverConfig = KafkaConfig.fromProps(props)
 
@@ -203,7 +203,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testInvalidCompressionType() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put("compression.type", "abc")
     intercept[IllegalArgumentException] {
       KafkaConfig.fromProps(props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index f252805..3d4258f 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -31,17 +31,16 @@ class LeaderElectionTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   var staleControllerEpochDetected = false
 
   override def setUp() {
     super.setUp()
+
+    val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, 
enableControlledShutdown = false)
+    val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, 
enableControlledShutdown = false)
+
     // start both servers
     val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
     val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
@@ -117,8 +116,8 @@ class LeaderElectionTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
     // start another controller
     val controllerId = 2
-    val controllerConfig = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, 
TestUtils.choosePort()))
-    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", 
s.config.port))
+    val controllerConfig = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", 
s.boundPort()))
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new 
ControllerChannelManager(controllerContext, controllerConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 8c9f9e7..496bf0d 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -39,19 +39,18 @@ class LogOffsetTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   var topicLogDir: File = null
   var server: KafkaServer = null
   var logSize: Int = 100
-  val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
   var time: Time = new MockTime()
 
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, brokerPort)
+    val config: Properties = createBrokerConfig(1)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
     server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 
64*1024, "")
+    simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 
1000000, 64*1024, "")
   }
 
   @After
@@ -194,10 +193,10 @@ class LogOffsetTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     assertEquals(Seq(0L), consumerOffsets)
   }
 
-  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+  private def createBrokerConfig(nodeId: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
-    props.put("port", port.toString)
+    props.put("port", TestUtils.RandomPort.toString())
     props.put("log.dir", getLogDir.getAbsolutePath)
     props.put("log.flush.interval.messages", "1")
     props.put("enable.zookeeper", "false")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index de255e3..92e49df 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,8 +19,7 @@ package kafka.server
 import java.util.Properties
 
 import kafka.utils.TestUtils._
-import kafka.utils.IntEncoder
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{IntEncoder, Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common._
 import kafka.producer.{KeyedMessage, Producer}
@@ -43,38 +42,48 @@ class LogRecoveryTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, 
replicaFetchWaitMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, 
replicaFetchMinBytes.toString)
 
-  val configs = TestUtils.createBrokerConfigs(2, 
false).map(KafkaConfig.fromProps(_, overridingProps))
+  var configs: Seq[KafkaConfig] = null
   val topic = "new-topic"
   val partitionId = 0
 
   var server1: KafkaServer = null
   var server2: KafkaServer = null
 
-  val configProps1 = configs.head
-  val configProps2 = configs.last
+  def configProps1 = configs.head
+  def configProps2 = configs.last
 
   val message = "hello"
 
   var producer: Producer[Int, String] = null
-  var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new 
File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
-  var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new 
File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new 
File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new 
File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
+  // Some tests restart the brokers then produce more data. But since test 
brokers use random ports, we need
+  // to use a new producer that knows the new ports
+  def updateProducer() = {
+    if (producer != null)
+      producer.close()
+    producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromServers(servers),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[IntEncoder].getName)
+  }
+
   override def setUp() {
     super.setUp()
 
+    configs = TestUtils.createBrokerConfigs(2, zkConnect, 
false).map(KafkaConfig.fromProps(_, overridingProps))
+
     // start both servers
     server1 = TestUtils.createServer(configProps1)
     server2 = TestUtils.createServer(configProps2)
-    servers ++= List(server1, server2)
+    servers = List(server1, server2)
 
     // create topic with 1 partition, 2 replicas, one on each broker
     createTopic(zkClient, topic, partitionReplicaAssignment = 
Map(0->Seq(0,1)), servers = servers)
 
     // create the producer
-    producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromConfigs(configs),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[IntEncoder].getName)
+    updateProducer()
   }
 
   override def tearDown() {
@@ -121,6 +130,8 @@ class LogRecoveryTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
     // bring the preferred replica back
     server1.startup()
+    // Update producer with new server settings
+    updateProducer()
 
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
     assertTrue("Leader must remain on broker 1, in case of zookeeper session 
expiration it can move to broker 0",
@@ -132,6 +143,7 @@ class LogRecoveryTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     server2.startup()
+    updateProducer()
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = leader)
     assertTrue("Leader must remain on broker 0, in case of zookeeper session 
expiration it can move to broker 1",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
@@ -181,6 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     server2.startup()
+    updateProducer()
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
@@ -189,6 +202,7 @@ class LogRecoveryTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
     // bring the preferred replica back
     server1.startup()
+    updateProducer()
 
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 7654275..a6bb690 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -37,7 +37,6 @@ import junit.framework.Assert._
 
 class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   val random: Random = new Random()
-  val brokerPort: Int = 9099
   val group = "test-group"
   val retentionCheckInterval: Long = 100L
   var logDir: File = null
@@ -50,14 +49,14 @@ class OffsetCommitTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, brokerPort)
+    val config: Properties = createBrokerConfig(1, zkConnect)
     config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, 
retentionCheckInterval.toString)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
     server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 
64*1024, "test-client")
+    simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 
1000000, 64*1024, "test-client")
     val consumerMetadataRequest = ConsumerMetadataRequest(group)
     Stream.continually {
       val consumerMetadataResponse = 
simpleConsumer.send(consumerMetadataRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 1e64faf..a67cc37 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -22,20 +22,19 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
-import kafka.utils.TestUtils
-import junit.framework.Assert._
+import kafka.utils.{TestUtils}
 import kafka.common._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
-  val props = createBrokerConfigs(2,false)
-  val configs = props.map(p => KafkaConfig.fromProps(p))
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"
 
   override def setUp() {
     super.setUp()
-    brokers = configs.map(config => TestUtils.createServer(config))
+    brokers = createBrokerConfigs(2, zkConnect, false)
+      .map(KafkaConfig.fromProps)
+      .map(config => TestUtils.createServer(config))
   }
 
   override def tearDown() {
@@ -54,7 +53,7 @@ class ReplicaFetchTest extends JUnit3Suite with 
ZooKeeperTestHarness  {
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, 
String](TestUtils.getBrokerListStrFromConfigs(configs),
+    val producer = TestUtils.createProducer[String, 
String](TestUtils.getBrokerListStrFromServers(brokers),
                                                             encoder = 
classOf[StringEncoder].getName,
                                                             keyEncoder = 
classOf[StringEncoder].getName)
     val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) 
++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 2849a5e..00d5933 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -38,7 +38,7 @@ class ReplicaManagerTest extends JUnit3Suite {
 
   @Test
   def testHighWaterMarkDirectoryMapping() {
-    val props = TestUtils.createBrokerConfig(1)
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)
@@ -54,7 +54,7 @@ class ReplicaManagerTest extends JUnit3Suite {
 
   @Test
   def testHighwaterMarkRelativeDirectoryMapping() {
-    val props = TestUtils.createBrokerConfig(1)
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
@@ -71,7 +71,7 @@ class ReplicaManagerTest extends JUnit3Suite {
 
   @Test
   def testIllegalRequiredAcks() {
-    val props = TestUtils.createBrokerConfig(1)
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 96a8a5a..2bfaeb3 100644
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -16,6 +16,8 @@
   */
 package kafka.server
 
+import java.util.Properties
+
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestUtils, Utils}
 import org.junit.Test
@@ -24,12 +26,19 @@ import junit.framework.Assert._
 import java.io.File
 
 class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness 
{
-  var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
-  var config1 = KafkaConfig.fromProps(props1)
-  var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
-  var config2 = KafkaConfig.fromProps(props2)
+  var props1: Properties = null
+  var config1: KafkaConfig = null
+  var props2: Properties = null
+  var config2: KafkaConfig = null
   val brokerMetaPropsFile = "meta.properties"
 
+  override def setUp() {
+    super.setUp()
+    props1 = TestUtils.createBrokerConfig(-1, zkConnect)
+    config1 = KafkaConfig.fromProps(props1)
+    props2 = TestUtils.createBrokerConfig(0, zkConnect)
+    config2 = KafkaConfig.fromProps(props2)
+  }
 
   @Test
   def testAutoGenerateBrokerId() {
@@ -51,7 +60,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // start the server with broker.id as part of config
     val server1 = new KafkaServer(config1)
     val server2 = new KafkaServer(config2)
-    val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
+    val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
     val config3 = KafkaConfig.fromProps(props3)
     val server3 = new KafkaServer(config3)
     server1.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 71317eb..a20321f 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -32,20 +32,23 @@ import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
-  val config = KafkaConfig.fromProps(props)
-
+  var config: KafkaConfig = null
   val host = "localhost"
   val topic = "test"
   val sent1 = List("hello", "there")
   val sent2 = List("more", "messages")
 
+  override def setUp(): Unit = {
+    super.setUp()
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    config = KafkaConfig.fromProps(props)
+  }
+
   @Test
   def testCleanShutdown() {
     var server = new KafkaServer(config)
     server.startup()
-    var producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+    var producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromServers(Seq(server)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[IntEncoder].getName)
 
@@ -71,10 +74,10 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // wait for the broker to receive the update metadata request after startup
     TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
-    producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+    producer = TestUtils.createProducer[Int, 
String](TestUtils.getBrokerListStrFromServers(Seq(server)),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[IntEncoder].getName)
-    val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
+    val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 
64*1024, "")
 
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@@ -103,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdownWithDeleteTopicEnabled() {
-    val newProps = TestUtils.createBrokerConfig(0, port)
+    val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("delete.topic.enable", "true")
     val newConfig = KafkaConfig.fromProps(newProps)
     val server = new KafkaServer(newConfig)
@@ -116,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdownAfterFailedStartup() {
-    val newProps = TestUtils.createBrokerConfig(0, port)
+    val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = KafkaConfig.fromProps(newProps)
     val server = new KafkaServer(newConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 60021ef..661ddd5 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -30,7 +30,7 @@ class ServerStartupTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   def testBrokerCreatesZKChroot {
     val brokerId = 0
     val zookeeperChroot = "/kafka-chroot-for-unittest"
-    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
     val server = TestUtils.createServer(KafkaConfig.fromProps(props))
@@ -47,11 +47,11 @@ class ServerStartupTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // This shouldn't affect the existing broker registration.
 
     val brokerId = 0
-    val props1 = TestUtils.createBrokerConfig(brokerId)
+    val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
     val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath 
+ "/" + brokerId)._1
 
-    val props2 = TestUtils.createBrokerConfig(brokerId)
+    val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     try {
       TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Registering a broker with a conflicting id should fail")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 519888e..09a0961 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -43,7 +43,7 @@ class SimpleFetchTest extends JUnit3Suite {
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 
replicaLagTimeMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, 
replicaFetchWaitMaxMs.toString)
 
-  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, 
overridingProps))
+  val configs = TestUtils.createBrokerConfigs(2, 
TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
 
   // set the replica manager with the partition
   val time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 2edc814..c96c0ff 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -53,7 +53,7 @@ class ReplicationUtilsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   @Test
   def testUpdateLeaderAndIsr() {
-    val configs = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps)
+    val configs = TestUtils.createBrokerConfigs(1, 
zkConnect).map(KafkaConfig.fromProps)
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
     EasyMock.expect(log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bb4daad..f451825 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -65,23 +65,13 @@ object TestUtils extends Logging {
   val seededRandom = new Random(192348092834L)
   val random = new Random()
 
-  /**
-   * Choose a number of random available ports
-   */
-  def choosePorts(count: Int): List[Int] = {
-    val sockets =
-      for(i <- 0 until count)
-      yield new ServerSocket(0)
-    val socketList = sockets.toList
-    val ports = socketList.map(_.getLocalPort)
-    socketList.map(_.close)
-    ports
-  }
+  /* 0 gives a random port; you can then retrieve the assigned port from the 
Socket object. */
+  val RandomPort = 0
 
-  /**
-   * Choose an available port
-   */
-  def choosePort(): Int = choosePorts(1).head
+  /** Port to use for unit tests that mock/don't require a real ZK server. */
+  val MockZkPort = 1
+  /** Zookeeper connection string to use for unit tests that mock/don't 
require a real ZK server. */
+  val MockZkConnect = "127.0.0.1:" + MockZkPort
 
   /**
    * Create a temporary directory
@@ -141,28 +131,29 @@ object TestUtils extends Logging {
    * Create a test config for the given node id
    */
   def createBrokerConfigs(numConfigs: Int,
+    zkConnect: String,
     enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false): List[Properties] = {
-    for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port, enableControlledShutdown, 
enableDeleteTopic)
+    enableDeleteTopic: Boolean = false): Seq[Properties] = {
+    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, 
enableControlledShutdown, enableDeleteTopic))
   }
 
-  def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
-    configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
+  def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
+    servers.map(s => formatAddress(s.config.hostName, 
s.boundPort())).mkString(",")
   }
 
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
+  def createBrokerConfig(nodeId: Int, zkConnect: String,
     enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false): Properties = {
+    enableDeleteTopic: Boolean = false,
+    port: Int = RandomPort): Properties = {
     val props = new Properties
     if (nodeId >= 0) props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
-    props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
+    props.put("zookeeper.connect", zkConnect)
     props.put("replica.socket.timeout.ms", "1500")
     props.put("controller.socket.timeout.ms", "1500")
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
@@ -756,7 +747,7 @@ object TestUtils extends Logging {
                    brokerState = new BrokerState())
   }
 
-  def sendMessagesToPartition(configs: Seq[KafkaConfig],
+  def sendMessagesToPartition(servers: Seq[KafkaServer],
                               topic: String,
                               partition: Int,
                               numMessages: Int,
@@ -765,7 +756,7 @@ object TestUtils extends Logging {
     val props = new Properties()
     props.put("compression.codec", compression.codec.toString)
     val producer: Producer[Int, String] =
-      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+      createProducer(TestUtils.getBrokerListStrFromServers(servers),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[IntEncoder].getName,
         partitioner = classOf[FixedValuePartitioner].getName,
@@ -778,7 +769,7 @@ object TestUtils extends Logging {
     ms.toList
   }
 
-  def sendMessages(configs: Seq[KafkaConfig],
+  def sendMessages(servers: Seq[KafkaServer],
                    topic: String,
                    producerId: String,
                    messagesPerNode: Int,
@@ -790,7 +781,7 @@ object TestUtils extends Logging {
     props.put("compression.codec", compression.codec.toString)
     props.put("client.id", producerId)
     val   producer: Producer[Int, String] =
-      createProducer(brokerList = 
TestUtils.getBrokerListStrFromConfigs(configs),
+      createProducer(brokerList = 
TestUtils.getBrokerListStrFromServers(servers),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[IntEncoder].getName,
         partitioner = classOf[FixedValuePartitioner].getName,
@@ -848,10 +839,6 @@ object TestUtils extends Logging {
 
 }
 
-object TestZKUtils {
-  val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort()
-}
-
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
   override def toBytes(n: Int) = n.toString.getBytes
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala 
b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 3151561..1d87506 100644
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -24,14 +24,16 @@ import java.net.InetSocketAddress
 import kafka.utils.Utils
 import org.apache.kafka.common.utils.Utils.getPort
 
-class EmbeddedZookeeper(val connectString: String) {
+class EmbeddedZookeeper() {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
   val tickTime = 500
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val factory = new NIOServerCnxnFactory()
-  factory.configure(new InetSocketAddress("127.0.0.1", 
getPort(connectString)), 0)
+  private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
+  factory.configure(addr, 0)
   factory.startup(zookeeper)
+  val port = zookeeper.getClientPort()
 
   def shutdown() {
     Utils.swallow(zookeeper.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 9897b2f..1bc45b1 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -29,7 +29,7 @@ class ZKPathTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   val path: String = "/some_dir"
   val zkSessionTimeoutMs = 1000
-  val zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
+  def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
 
   def testCreatePersistentPathThrowsException {
     val config = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 67d9c4b..fedefb5 100644
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,19 +19,22 @@ package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
+import kafka.utils.{ZKStringSerializer, Utils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
-  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkPort: Int = -1
   var zookeeper: EmbeddedZookeeper = null
   var zkClient: ZkClient = null
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
 
+  def zkConnect: String = "127.0.0.1:" + zkPort
+
   override def setUp() {
     super.setUp
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, 
zkConnectionTimeout, ZKStringSerializer)
+    zookeeper = new EmbeddedZookeeper()
+    zkPort = zookeeper.port
+    zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
ZKStringSerializer)
   }
 
   override def tearDown() {

Reply via email to