Repository: spark Updated Branches: refs/heads/branch-2.0 1db027d11 -> f021f3460
[SPARK-14936][BUILD][TESTS] FlumePollingStreamSuite is slow https://issues.apache.org/jira/browse/SPARK-14936 ## What changes were proposed in this pull request? FlumePollingStreamSuite contains two tests which run for a minute each. This seems excessively slow and we should speed it up if possible. In this PR, instead of creating `StreamingContext` directly from `conf`, here an underlying `SparkContext` is created before all and it is used to create each`StreamingContext`. Running time is reduced by avoiding multiple `SparkContext` creations and destroys. ## How was this patch tested? Tested on my local machine running `testOnly *.FlumePollingStreamSuite` Author: Xin Ren <iamsh...@126.com> Closes #12845 from keypointt/SPARK-14936. (cherry picked from commit 86475520f88f90c9d3b71516f65ccc0e9d244863) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f021f346 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f021f346 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f021f346 Branch: refs/heads/branch-2.0 Commit: f021f346015a9c9fd045fcbea51748061273accd Parents: 1db027d Author: Xin Ren <iamsh...@126.com> Authored: Tue May 10 15:12:47 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue May 10 15:12:53 2016 -0700 ---------------------------------------------------------------------- .../streaming/flume/PollingFlumeTestUtils.scala | 4 +-- .../flume/FlumePollingStreamSuite.scala | 26 +++++++++++++++----- python/pyspark/streaming/tests.py | 2 +- 3 files changed, 23 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 6a4dafb..15ff4f6 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -116,7 +116,7 @@ private[flume] class PollingFlumeTestUtils { /** * Send data and wait until all data has been received */ - def sendDatAndEnsureAllDataHasBeenReceived(): Unit = { + def sendDataAndEnsureAllDataHasBeenReceived(): Unit = { val executor = Executors.newCachedThreadPool() val executorCompletion = new ExecutorCompletionService[Void](executor) @@ -174,7 +174,7 @@ private[flume] class PollingFlumeTestUtils { val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") queueRemaining.setAccessible(true) val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") - if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) { + if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != channelCapacity) { throw new AssertionError(s"Channel ${channel.getName} is not empty") } } http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 1567124..1c93079 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -24,10 +24,10 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps -import org.scalatest.BeforeAndAfter +import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.storage.StorageLevel @@ -35,11 +35,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.util.{ManualClock, Utils} -class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging { +class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { val maxAttempts = 5 val batchDuration = Seconds(1) + @transient private var _sc: SparkContext = _ + val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) @@ -47,6 +49,17 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log val utils = new PollingFlumeTestUtils + override def beforeAll(): Unit = { + _sc = new SparkContext(conf) + } + + override def afterAll(): Unit = { + if (_sc != null) { + _sc.stop() + _sc = null + } + } + test("flume polling test") { testMultipleTimes(testFlumePolling) } @@ -98,7 +111,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log def writeAndVerify(sinkPorts: Seq[Int]): Unit = { // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) + val ssc = new StreamingContext(_sc, batchDuration) val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port)) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, @@ -109,7 +122,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log ssc.start() try { - utils.sendDatAndEnsureAllDataHasBeenReceived() + utils.sendDataAndEnsureAllDataHasBeenReceived() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] clock.advance(batchDuration.milliseconds) @@ -123,7 +136,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log utils.assertOutput(headers.asJava, bodies.asJava) } } finally { - ssc.stop() + // here stop ssc only, but not underlying sparkcontext + ssc.stop(false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f021f346/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 148bf7e..f27628c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1357,7 +1357,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase): dstream.foreachRDD(get_output) ssc.start() - self._utils.sendDatAndEnsureAllDataHasBeenReceived() + self._utils.sendDataAndEnsureAllDataHasBeenReceived() self.wait_for(outputBuffer, self._utils.getTotalEvents()) outputHeaders = [event[0] for event in outputBuffer] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org