Repository: spark Updated Branches: refs/heads/master 25cad6adf -> 44460ba59
HOTFIX: Fix concurrency issue in FlumePollingStreamSuite. This has been failing on master. One possible cause is that the port gets contended if multiple test runs happen concurrently and they hit this test at the same time. Since this test takes a long time (60 seconds) that's very plausible. This patch randomizes the port used in this test to avoid contention. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44460ba5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44460ba5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44460ba5 Branch: refs/heads/master Commit: 44460ba594fbfe5a6ee66e5121ead914bf16f9f6 Parents: 25cad6a Author: Patrick Wendell <pwend...@gmail.com> Authored: Sat Aug 2 01:11:03 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Sat Aug 2 01:16:13 2014 -0700 ---------------------------------------------------------------------- .../spark/streaming/flume/FlumePollingStreamSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/44460ba5/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 47071d0..27bf2ac 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} +import java.util.Random import scala.collection.JavaConversions._ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} @@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._ class FlumePollingStreamSuite extends TestSuiteBase { - val testPort = 9999 + val random = new Random() + /** Return a port in the ephemeral range. */ + def getTestPort = random.nextInt(16382) + 49152 val batchCount = 5 val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 test("flume polling test") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = @@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { } test("flume polling test multiple hosts") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org