Repository: spark Updated Branches: refs/heads/master 862283e9c -> 7782a304a
[SPARK-1942] Stop clearing spark.driver.port in unit tests stop resetting spark.driver.port in unit tests (scala, java and python). Author: Syed Hashmi <shas...@cloudera.com> Author: CodingCat <zhunans...@gmail.com> Closes #943 from syedhashmi/master and squashes the following commits: 885f210 [Syed Hashmi] Removing unnecessary file (created by mergetool) b8bd4b5 [Syed Hashmi] Merge remote-tracking branch 'upstream/master' b895e59 [Syed Hashmi] Revert "[SPARK-1784] Add a new partitioner" 57b6587 [Syed Hashmi] Revert "[SPARK-1784] Add a balanced partitioner" 1574769 [Syed Hashmi] [SPARK-1942] Stop clearing spark.driver.port in unit tests 4354836 [Syed Hashmi] Revert "SPARK-1686: keep schedule() calling in the main thread" fd36542 [Syed Hashmi] [SPARK-1784] Add a balanced partitioner 6668015 [CodingCat] SPARK-1686: keep schedule() calling in the main thread 4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7782a304 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7782a304 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7782a304 Branch: refs/heads/master Commit: 7782a304ad105ec95cf62cb799e365e5fb385a69 Parents: 862283e Author: Syed Hashmi <shas...@cloudera.com> Authored: Tue Jun 3 12:04:47 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Tue Jun 3 12:04:47 2014 -0700 ---------------------------------------------------------------------- bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala | 2 -- core/src/test/java/org/apache/spark/JavaAPISuite.java | 3 --- core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala | 4 ---- core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 3 --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 -- .../src/test/java/org/apache/spark/Java8APISuite.java | 2 -- .../test/scala/org/apache/spark/graphx/LocalSparkContext.scala | 2 -- .../spark/mllib/classification/JavaLogisticRegressionSuite.java | 1 - .../apache/spark/mllib/classification/JavaNaiveBayesSuite.java | 1 - .../java/org/apache/spark/mllib/classification/JavaSVMSuite.java | 1 - .../java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java | 1 - .../java/org/apache/spark/mllib/recommendation/JavaALSSuite.java | 1 - .../java/org/apache/spark/mllib/regression/JavaLassoSuite.java | 1 - .../apache/spark/mllib/regression/JavaLinearRegressionSuite.java | 1 - .../apache/spark/mllib/regression/JavaRidgeRegressionSuite.java | 1 - .../scala/org/apache/spark/mllib/util/LocalSparkContext.scala | 1 - python/pyspark/tests.py | 4 ---- repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala | 2 -- sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala | 1 - .../org/apache/spark/streaming/util/MasterFailureTest.scala | 1 - .../org/apache/spark/streaming/LocalJavaStreamingContext.java | 4 ---- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 1 - .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 2 -- 23 files changed, 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala ---------------------------------------------------------------------- diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 8e0f82d..110bd0a 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo sc.stop() sc = null } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } test("halting by voting") { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3dd7924..7193223 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -68,9 +68,6 @@ public class JavaAPISuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - Utils.deleteRecursively(tempDir); } static class ReverseIntComparator implements Comparator<Integer>, Serializable { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index c645e4c..4ab870e 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -39,7 +39,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -77,7 +76,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -129,7 +127,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -182,7 +179,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6b2571c..95ba273 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -124,9 +124,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = new SecurityManager(conf)) - // Will be cleared by LocalSparkContext - System.setProperty("spark.driver.port", boundPort.toString) - val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 00deecc..81bd825 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -78,8 +78,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { - System.clearProperty("spark.driver.port") - if (store != null) { store.stop() store = null http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 84d3b6f..c366c10 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -58,8 +58,6 @@ public class Java8APISuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } @Test http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 51f02f9..47594a8 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -38,8 +38,6 @@ trait LocalSparkContext { f(sc) } finally { sc.stop() - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java index d75d3a6..faa675b 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -42,7 +42,6 @@ public class JavaLogisticRegressionSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List<LabeledPoint> validationData, LogisticRegressionModel model) { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index 743a43a..1c90522 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -44,7 +44,6 @@ public class JavaNaiveBayesSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } private static final List<LabeledPoint> POINTS = Arrays.asList( http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java index 667f76a..31b9f3e 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java @@ -41,7 +41,6 @@ public class JavaSVMSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List<LabeledPoint> validationData, SVMModel model) { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 0c916ca..31676e6 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -44,7 +44,6 @@ public class JavaKMeansSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } @Test http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index b150334..bf2365f 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -42,7 +42,6 @@ public class JavaALSSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java index f725924..8950b48 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java @@ -41,7 +41,6 @@ public class JavaLassoSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List<LabeledPoint> validationData, LassoModel model) { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java index 6dc6877..24c4c20 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -43,7 +43,6 @@ public class JavaLinearRegressionSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java index 03714ae..7266eec 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -43,7 +43,6 @@ public class JavaRidgeRegressionSuite implements Serializable { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } double predictionError(List<LabeledPoint> validationData, RidgeRegressionModel model) { http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala index 212fbe9..0d4868f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -34,7 +34,6 @@ trait LocalSparkContext extends BeforeAndAfterAll { self: Suite => if (sc != null) { sc.stop() } - System.clearProperty("spark.driver.port") super.afterAll() } } http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ed90915..1f2a6ea 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -57,10 +57,6 @@ class PySparkTestCase(unittest.TestCase): def tearDown(self): self.sc.stop() sys.path = self._old_sys_path - # To avoid Akka rebinding to the same port, since it doesn't unbind - # immediately on shutdown - self.sc._jvm.System.clearProperty("spark.driver.port") - class TestCheckpoint(PySparkTestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 95460aa..98cdfd0 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -51,8 +51,6 @@ class ReplSuite extends FunSuite { if (interp.sparkContext != null) { interp.sparkContext.stop() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") return out.toString } http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index fa7d010..041e813 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -58,7 +58,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. - System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index b3ed302..98e17ff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -187,7 +187,6 @@ object MasterFailureTest extends Logging { setupCalled = true // Setup the streaming computation with the given operation - System.clearProperty("spark.driver.port") val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 849bbf1..6e1f019 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -27,7 +27,6 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.clearProperty("spark.driver.port"); System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); @@ -37,8 +36,5 @@ public abstract class LocalJavaStreamingContext { public void tearDown() { ssc.stop(); ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } } http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index d20a7b7..10ad3c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -370,7 +370,6 @@ class CheckpointSuite extends TestSuiteBase { "\n-------------------------------------------\n" ) ssc = new StreamingContext(checkpointDir) - System.clearProperty("spark.driver.port") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart http://git-wip-us.apache.org/repos/asf/spark/blob/7782a304/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 8036f77..cc178fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -153,8 +153,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Default after function for any streaming test suite. Override this // if you want to add your stuff to "after" (i.e., don't call after { } ) def afterFunction() { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") System.clearProperty("spark.streaming.clock") }