Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0754ccb2b -> cabee2324


[SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka

## What changes were proposed in this pull request?

Testing for 0.10 uncovered an issue with a fixed port number being used in 
KafkaTestUtils.  This is making a roughly equivalent fix for the 0.8 connector

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <c...@koeninger.org>

Closes #14018 from koeninger/kafka-0-8-test-port.

(cherry picked from commit 1fca9da95dc9b9aaf9ae75fd7456378861d8b409)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cabee232
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cabee232
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cabee232

Branch: refs/heads/branch-2.0
Commit: cabee23241922d55179c3e725f24397eccc75471
Parents: 0754ccb
Author: cody koeninger <c...@koeninger.org>
Authored: Tue Jul 5 11:45:54 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jul 5 11:46:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cabee232/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index d9d4240..abfd7aa 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{ZKStringSerializer, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.commons.lang3.RandomUtils
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
 import org.apache.spark.SparkConf
@@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging {
 
   // Kafka broker related configurations
   private val brokerHost = "localhost"
-  private var brokerPort = 9092
+  // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random 
port
+  private var brokerPort = RandomUtils.nextInt(1024, 65536)
   private var brokerConf: KafkaConfig = _
 
   // Kafka broker server
@@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging {
       brokerConf = new KafkaConfig(brokerConfiguration)
       server = new KafkaServer(brokerConf)
       server.startup()
-      (server, port)
+      (server, brokerPort)
     }, new SparkConf(), "KafkaBroker")
 
     brokerReady = true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to