Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1db027d11 -> f021f3460


[SPARK-14936][BUILD][TESTS] FlumePollingStreamSuite is slow

https://issues.apache.org/jira/browse/SPARK-14936

## What changes were proposed in this pull request?

FlumePollingStreamSuite contains two tests which run for a minute each. This 
seems excessively slow and we should speed it up if possible.

In this PR, instead of creating `StreamingContext` directly from `conf`, here 
an underlying `SparkContext` is created before all and it is used to create  
each`StreamingContext`.

Running time is reduced by avoiding multiple `SparkContext` creations and 
destroys.

## How was this patch tested?

Tested on my local machine running `testOnly *.FlumePollingStreamSuite`

Author: Xin Ren <iamsh...@126.com>

Closes #12845 from keypointt/SPARK-14936.

(cherry picked from commit 86475520f88f90c9d3b71516f65ccc0e9d244863)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f021f346
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f021f346
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f021f346

Branch: refs/heads/branch-2.0
Commit: f021f346015a9c9fd045fcbea51748061273accd
Parents: 1db027d
Author: Xin Ren <iamsh...@126.com>
Authored: Tue May 10 15:12:47 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue May 10 15:12:53 2016 -0700

----------------------------------------------------------------------
 .../streaming/flume/PollingFlumeTestUtils.scala |  4 +--
 .../flume/FlumePollingStreamSuite.scala         | 26 +++++++++++++++-----
 python/pyspark/streaming/tests.py               |  2 +-
 3 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 6a4dafb..15ff4f6 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -116,7 +116,7 @@ private[flume] class PollingFlumeTestUtils {
   /**
    * Send data and wait until all data has been received
    */
-  def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+  def sendDataAndEnsureAllDataHasBeenReceived(): Unit = {
     val executor = Executors.newCachedThreadPool()
     val executorCompletion = new ExecutorCompletionService[Void](executor)
 
@@ -174,7 +174,7 @@ private[flume] class PollingFlumeTestUtils {
     val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
     queueRemaining.setAccessible(true)
     val m = 
queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 
channelCapacity) {
       throw new AssertionError(s"Channel ${channel.getName} is not empty")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 1567124..1c93079 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.scalatest.BeforeAndAfter
+import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.storage.StorageLevel
@@ -35,11 +35,13 @@ import org.apache.spark.streaming.{Seconds, 
StreamingContext, TestOutputStream}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.util.{ManualClock, Utils}
 
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with 
Logging {
+class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll 
with Logging {
 
   val maxAttempts = 5
   val batchDuration = Seconds(1)
 
+  @transient private var _sc: SparkContext = _
+
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName(this.getClass.getSimpleName)
@@ -47,6 +49,17 @@ class FlumePollingStreamSuite extends SparkFunSuite with 
BeforeAndAfter with Log
 
   val utils = new PollingFlumeTestUtils
 
+  override def beforeAll(): Unit = {
+    _sc = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+    if (_sc != null) {
+      _sc.stop()
+      _sc = null
+    }
+  }
+
   test("flume polling test") {
     testMultipleTimes(testFlumePolling)
   }
@@ -98,7 +111,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with 
BeforeAndAfter with Log
 
   def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
     // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
+    val ssc = new StreamingContext(_sc, batchDuration)
     val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", 
port))
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
       FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK,
@@ -109,7 +122,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with 
BeforeAndAfter with Log
 
     ssc.start()
     try {
-      utils.sendDatAndEnsureAllDataHasBeenReceived()
+      utils.sendDataAndEnsureAllDataHasBeenReceived()
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
       clock.advance(batchDuration.milliseconds)
 
@@ -123,7 +136,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with 
BeforeAndAfter with Log
         utils.assertOutput(headers.asJava, bodies.asJava)
       }
     } finally {
-      ssc.stop()
+      // here stop ssc only, but not underlying sparkcontext
+      ssc.stop(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 148bf7e..f27628c 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1357,7 +1357,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
 
             dstream.foreachRDD(get_output)
             ssc.start()
-            self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+            self._utils.sendDataAndEnsureAllDataHasBeenReceived()
 
             self.wait_for(outputBuffer, self._utils.getTotalEvents())
             outputHeaders = [event[0] for event in outputBuffer]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to