Repository: spark Updated Branches: refs/heads/branch-2.0 0754ccb2b -> cabee2324
[SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka ## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <c...@koeninger.org> Closes #14018 from koeninger/kafka-0-8-test-port. (cherry picked from commit 1fca9da95dc9b9aaf9ae75fd7456378861d8b409) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cabee232 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cabee232 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cabee232 Branch: refs/heads/branch-2.0 Commit: cabee23241922d55179c3e725f24397eccc75471 Parents: 0754ccb Author: cody koeninger <c...@koeninger.org> Authored: Tue Jul 5 11:45:54 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Jul 5 11:46:06 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cabee232/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index d9d4240..abfd7aa 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.commons.lang3.RandomUtils import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port + private var brokerPort = RandomUtils.nextInt(1024, 65536) private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() - (server, port) + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org