Repository: spark
Updated Branches:
  refs/heads/master 25cad6adf -> 44460ba59


HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.

This has been failing on master. One possible cause is that the port
gets contended if multiple test runs happen concurrently and they
hit this test at the same time. Since this test takes a long time
(60 seconds) that's very plausible. This patch randomizes the port
used in this test to avoid contention.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44460ba5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44460ba5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44460ba5

Branch: refs/heads/master
Commit: 44460ba594fbfe5a6ee66e5121ead914bf16f9f6
Parents: 25cad6a
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Sat Aug 2 01:11:03 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Sat Aug 2 01:16:13 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/flume/FlumePollingStreamSuite.scala       | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44460ba5/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 47071d0..27bf2ac 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
 import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.Random
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
-  val testPort = 9999
+  val random = new Random()
+  /** Return a port in the ephemeral range. */
+  def getTestPort = random.nextInt(16382) + 49152
   val batchCount = 5
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
   val channelCapacity = 5000
 
   test("flume polling test") {
+    val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   test("flume polling test multiple hosts") {
+    val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val addresses = Seq(testPort, testPort + 1).map(new 
InetSocketAddress("localhost", _))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to