Repository: spark
Updated Branches:
  refs/heads/master 13c17cbb0 -> 219a74a7c


[STREAMING][TEST] Fix flaky streaming.FailureSuite

Under some corner cases, the test suite failed to shutdown the SparkContext 
causing cascaded failures. This fix does two things
- Makes sure no SparkContext is active after every test
- Makes sure StreamingContext is always shutdown (prevents leaking of 
StreamingContexts as well, just in case)

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #11166 from tdas/fix-failuresuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/219a74a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/219a74a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/219a74a7

Branch: refs/heads/master
Commit: 219a74a7c2d3b858224c4738190ccc92d7cbf06d
Parents: 13c17cb
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Thu Feb 11 10:10:36 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Feb 11 10:10:36 2016 -0800

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/streaming/FailureSuite.scala    | 5 ++++-
 .../scala/org/apache/spark/streaming/MasterFailureTest.scala    | 3 ++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/219a74a7/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6a0b0a1..31e159e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.util.Utils
 
 /**
@@ -43,6 +43,9 @@ class FailureSuite extends SparkFunSuite with BeforeAndAfter 
with Logging {
       Utils.deleteRecursively(directory)
     }
     StreamingContext.getActive().foreach { _.stop() }
+
+    // Stop SparkContext if active
+    SparkContext.getOrCreate(new 
SparkConf().setMaster("local").setAppName("bla")).stop()
   }
 
   test("multiple failures with map") {

http://git-wip-us.apache.org/repos/asf/spark/blob/219a74a7/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index a02d49e..faa9c4f 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -242,6 +242,8 @@ object MasterFailureTest extends Logging {
         }
       } catch {
         case e: Exception => logError("Error running streaming context", e)
+      } finally {
+        ssc.stop()
       }
       if (killingThread.isAlive) {
         killingThread.interrupt()
@@ -250,7 +252,6 @@ object MasterFailureTest extends Logging {
         // to null after the next test creates the new SparkContext and fail 
the test.
         killingThread.join()
       }
-      ssc.stop()
 
       logInfo("Has been killed = " + killed)
       logInfo("Is last output generated = " + isLastOutputGenerated)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to