Repository: spark
Updated Branches:
  refs/heads/master 99243288b -> 95470a03a


[HOTFIX][STREAMING] Allow the JVM/Netty to decide which port to bind to in 
Flume Polling Tests.

Author: Hari Shreedharan <harishreedha...@gmail.com>

Closes #1820 from harishreedharan/use-free-ports and squashes the following 
commits:

b939067 [Hari Shreedharan] Remove unused import.
67856a8 [Hari Shreedharan] Remove findFreePort.
0ea51d1 [Hari Shreedharan] Make some changes to getPort to use map on the 
serverOpt.
1fb0283 [Hari Shreedharan] Merge branch 'master' of 
https://github.com/apache/spark into use-free-ports
b351651 [Hari Shreedharan] Allow Netty to choose port, and query it to decide 
the port to bind to. Leaving findFreePort as is, if other tests want to use it 
at some point.
e6c9620 [Hari Shreedharan] Making sure the second sink uses the correct port.
11c340d [Hari Shreedharan] Add info about race condition to scaladoc.
e89d135 [Hari Shreedharan] Adding Scaladoc.
6013bb0 [Hari Shreedharan] [STREAMING] Find free ports to use before attempting 
to create Flume Sink in Flume Polling Suite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95470a03
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95470a03
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95470a03

Branch: refs/heads/master
Commit: 95470a03ae85d7d37d75f73435425a0e22918bc9
Parents: 9924328
Author: Hari Shreedharan <harishreedha...@gmail.com>
Authored: Sun Aug 17 19:50:31 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sun Aug 17 19:50:31 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/flume/sink/SparkSink.scala  |  8 +++
 .../flume/FlumePollingStreamSuite.scala         | 55 +++++++++-----------
 2 files changed, 34 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95470a03/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 7b73513..948af59 100644
--- 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -131,6 +131,14 @@ class SparkSink extends AbstractSink with Logging with 
Configurable {
     blockingLatch.await()
     Status.BACKOFF
   }
+
+  private[flume] def getPort(): Int = {
+    serverOpt
+      .map(_.getPort)
+      .getOrElse(
+        throw new RuntimeException("Server was not started!")
+      )
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/95470a03/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index a69baa1..8a85b0f 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
 import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
 import java.util.Random
 
+import org.apache.spark.TestUtils
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 
@@ -39,9 +41,6 @@ import org.apache.spark.util.Utils
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
-  val random = new Random()
-  /** Return a port in the ephemeral range. */
-  def getTestPort = random.nextInt(16382) + 49152
   val batchCount = 5
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
@@ -77,17 +76,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   private def testFlumePolling(): Unit = {
-    val testPort = getTestPort
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, Seq(new 
InetSocketAddress("localhost", testPort)),
-        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    outputStream.register()
-
     // Start the channel and sink.
     val context = new Context()
     context.put("capacity", channelCapacity.toString)
@@ -98,10 +86,19 @@ class FlumePollingStreamSuite extends TestSuiteBase {
 
     val sink = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink, context)
     sink.setChannel(channel)
     sink.start()
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, Seq(new 
InetSocketAddress("localhost", sink.getPort())),
+        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
     ssc.start()
 
     writeAndVerify(Seq(channel), ssc, outputBuffer)
@@ -111,18 +108,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   private def testFlumePollingMultipleHost(): Unit = {
-    val testPort = getTestPort
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val addresses = Seq(testPort, testPort + 1).map(new 
InetSocketAddress("localhost", _))
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK,
-        eventsPerBatch, 5)
-    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
-      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    outputStream.register()
-
     // Start the channel and sink.
     val context = new Context()
     context.put("capacity", channelCapacity.toString)
@@ -136,17 +121,29 @@ class FlumePollingStreamSuite extends TestSuiteBase {
 
     val sink = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink, context)
     sink.setChannel(channel)
     sink.start()
 
     val sink2 = new SparkSink()
     context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
     Configurables.configure(sink2, context)
     sink2.setChannel(channel2)
     sink2.start()
+
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val addresses = Seq(sink.getPort(), sink2.getPort()).map(new 
InetSocketAddress("localhost", _))
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK,
+        eventsPerBatch, 5)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
+
     ssc.start()
     writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
     assertChannelIsEmpty(channel)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to