Fix test failures due to setting / clearing clock type in Streaming

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/578bd1fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/578bd1fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/578bd1fc

Branch: refs/heads/master
Commit: 578bd1fc28513eb84002c604000250f5cff9b815
Parents: 5bbe738
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sat Dec 28 21:21:06 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Dec 28 21:21:06 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/spark/streaming/JavaAPISuite.java  |  7 ++++---
 .../apache/spark/streaming/BasicOperationsSuite.scala  | 13 ++++++++-----
 .../org/apache/spark/streaming/TestSuiteBase.scala     |  1 +
 .../apache/spark/streaming/WindowOperationsSuite.scala |  3 +--
 4 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f..a1db099 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -62,13 +62,14 @@ public class JavaAPISuite implements Serializable {
 
   @Before
   public void setUp() {
-      System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
-      ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+    System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
+    ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
     ssc.checkpoint("checkpoint");
   }
 
   @After
   public void tearDown() {
+    System.clearProperty("spark.streaming.clock");
     ssc.stop();
     ssc = null;
 
@@ -101,7 +102,7 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList("hello", "world"),
         Arrays.asList("goodnight", "moon"));
 
-   List<List<Integer>> expected = Arrays.asList(
+    List<List<Integer>> expected = Arrays.asList(
         Arrays.asList(5,5),
         Arrays.asList(9,4));
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 259ef16..60e986c 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -23,14 +23,13 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 
 import util.ManualClock
+import org.apache.spark.{SparkContext, SparkConf}
 
 class BasicOperationsSuite extends TestSuiteBase {
 
-  override def framework() = "BasicOperationsSuite"
+  override def framework = "BasicOperationsSuite"
 
-  before {
-    System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
-  }
+  conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
 
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind 
immediately on shutdown
@@ -387,7 +386,11 @@ class BasicOperationsSuite extends TestSuiteBase {
   }
 
   test("slice") {
-    val ssc = new StreamingContext("local[2]", "BasicOperationSuite", 
Seconds(1))
+    val conf2 = new SparkConf()
+      .setMaster("local[2]")
+      .setAppName("BasicOperationsSuite")
+      .set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+    val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1))
     val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
     val stream = new TestInputStream[Int](ssc, input, 2)
     ssc.registerInputStream(stream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index a265284..3dd6718 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
   // Whether to actually wait in real time before changing manual clock
   def actuallyWait = false
 
+  // A SparkConf to use in tests. Can be modified before calling setupStreams 
to configure things.
   val conf = new SparkConf()
     .setMaster(master)
     .setAppName(framework)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index f50e05c..3242c4c 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
-import collection.mutable.ArrayBuffer
 
 class WindowOperationsSuite extends TestSuiteBase {
 
-  System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+  conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
 
   override def framework = "WindowOperationsSuite"
 

Reply via email to