Fix test failures due to setting / clearing clock type in Streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/578bd1fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/578bd1fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/578bd1fc Branch: refs/heads/master Commit: 578bd1fc28513eb84002c604000250f5cff9b815 Parents: 5bbe738 Author: Matei Zaharia <ma...@databricks.com> Authored: Sat Dec 28 21:21:06 2013 -0500 Committer: Matei Zaharia <ma...@databricks.com> Committed: Sat Dec 28 21:21:06 2013 -0500 ---------------------------------------------------------------------- .../java/org/apache/spark/streaming/JavaAPISuite.java | 7 ++++--- .../apache/spark/streaming/BasicOperationsSuite.scala | 13 ++++++++----- .../org/apache/spark/streaming/TestSuiteBase.scala | 1 + .../apache/spark/streaming/WindowOperationsSuite.scala | 3 +-- 4 files changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f..a1db099 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -62,13 +62,14 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); } @After public void tearDown() { + System.clearProperty("spark.streaming.clock"); ssc.stop(); ssc = null; @@ -101,7 +102,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon")); - List<List<Integer>> expected = Arrays.asList( + List<List<Integer>> expected = Arrays.asList( Arrays.asList(5,5), Arrays.asList(9,4)); http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 259ef16..60e986c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -23,14 +23,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import util.ManualClock +import org.apache.spark.{SparkContext, SparkConf} class BasicOperationsSuite extends TestSuiteBase { - override def framework() = "BasicOperationsSuite" + override def framework = "BasicOperationsSuite" - before { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - } + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown @@ -387,7 +386,11 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1)) + val conf2 = new SparkConf() + .setMaster("local[2]") + .setAppName("BasicOperationsSuite") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index a265284..3dd6718 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Whether to actually wait in real time before changing manual clock def actuallyWait = false + // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things. val conf = new SparkConf() .setMaster(master) .setAppName(framework) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/578bd1fc/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index f50e05c..3242c4c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") override def framework = "WindowOperationsSuite"