Repository: spark
Updated Branches:
  refs/heads/master e72c16e30 -> 3134c3fe4


[SPARK-6953] [PySpark] speed up python tests

This PR try to speed up some python tests:

```
tests.py                       144s -> 103s      -41s
mllib/classification.py         24s -> 17s        -7s
mllib/regression.py             27s -> 15s       -12s
mllib/tree.py                   27s -> 13s       -14s
mllib/tests.py                  64s -> 31s       -33s
streaming/tests.py             185s -> 84s      -101s
```
Considering python3, the total saving will be 558s (almost 10 minutes) (core, 
and streaming run three times, mllib runs twice).

During testing, it will show used time for each test file:
```
Run core tests ...
Running test: pyspark/rdd.py ... ok (22s)
Running test: pyspark/context.py ... ok (16s)
Running test: pyspark/conf.py ... ok (4s)
Running test: pyspark/broadcast.py ... ok (4s)
Running test: pyspark/accumulators.py ... ok (4s)
Running test: pyspark/serializers.py ... ok (6s)
Running test: pyspark/profiler.py ... ok (5s)
Running test: pyspark/shuffle.py ... ok (1s)
Running test: pyspark/tests.py ... ok (103s)   144s
```

Author: Reynold Xin <r...@databricks.com>
Author: Xiangrui Meng <m...@databricks.com>

Closes #5605 from rxin/python-tests-speed and squashes the following commits:

d08542d [Reynold Xin] Merge pull request #14 from mengxr/SPARK-6953
89321ee [Xiangrui Meng] fix seed in tests
3ad2387 [Reynold Xin] Merge pull request #5427 from davies/python_tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134c3fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134c3fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134c3fe

Branch: refs/heads/master
Commit: 3134c3fe495862b7687b5aa00d3344d09cd5e08e
Parents: e72c16e
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Apr 21 17:49:55 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Apr 21 17:49:55 2015 -0700

----------------------------------------------------------------------
 python/pyspark/mllib/classification.py | 17 ++---
 python/pyspark/mllib/regression.py     | 25 +++++---
 python/pyspark/mllib/tests.py          | 69 +++++++++++----------
 python/pyspark/mllib/tree.py           | 15 ++---
 python/pyspark/shuffle.py              |  7 ++-
 python/pyspark/sql/tests.py            |  4 +-
 python/pyspark/streaming/tests.py      | 63 +++++++++++--------
 python/pyspark/tests.py                | 96 ++++++++++++++++++-----------
 python/run-tests                       | 13 ++--
 9 files changed, 182 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index eda0b60..a70c664 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -86,7 +86,7 @@ class LogisticRegressionModel(LinearClassificationModel):
     ...     LabeledPoint(0.0, [0.0, 1.0]),
     ...     LabeledPoint(1.0, [1.0, 0.0]),
     ... ]
-    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
+    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), 
iterations=10)
     >>> lrm.predict([1.0, 0.0])
     1
     >>> lrm.predict([0.0, 1.0])
@@ -95,7 +95,7 @@ class LogisticRegressionModel(LinearClassificationModel):
     [1, 0]
     >>> lrm.clearThreshold()
     >>> lrm.predict([0.0, 1.0])
-    0.123...
+    0.279...
 
     >>> sparse_data = [
     ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
@@ -103,7 +103,7 @@ class LogisticRegressionModel(LinearClassificationModel):
     ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
     ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
     ... ]
-    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+    >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), 
iterations=10)
     >>> lrm.predict(array([0.0, 1.0]))
     1
     >>> lrm.predict(array([1.0, 0.0]))
@@ -129,7 +129,8 @@ class LogisticRegressionModel(LinearClassificationModel):
     ...     LabeledPoint(1.0, [1.0, 0.0, 0.0]),
     ...     LabeledPoint(2.0, [0.0, 0.0, 1.0])
     ... ]
-    >>> mcm = 
LogisticRegressionWithLBFGS.train(data=sc.parallelize(multi_class_data), 
numClasses=3)
+    >>> data = sc.parallelize(multi_class_data)
+    >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, 
numClasses=3)
     >>> mcm.predict([0.0, 0.5, 0.0])
     0
     >>> mcm.predict([0.8, 0.0, 0.0])
@@ -298,7 +299,7 @@ class LogisticRegressionWithLBFGS(object):
         ...     LabeledPoint(0.0, [0.0, 1.0]),
         ...     LabeledPoint(1.0, [1.0, 0.0]),
         ... ]
-        >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data))
+        >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), 
iterations=10)
         >>> lrm.predict([1.0, 0.0])
         1
         >>> lrm.predict([0.0, 1.0])
@@ -330,14 +331,14 @@ class SVMModel(LinearClassificationModel):
     ...     LabeledPoint(1.0, [2.0]),
     ...     LabeledPoint(1.0, [3.0])
     ... ]
-    >>> svm = SVMWithSGD.train(sc.parallelize(data))
+    >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
     >>> svm.predict([1.0])
     1
     >>> svm.predict(sc.parallelize([[1.0]])).collect()
     [1]
     >>> svm.clearThreshold()
     >>> svm.predict(array([1.0]))
-    1.25...
+    1.44...
 
     >>> sparse_data = [
     ...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
@@ -345,7 +346,7 @@ class SVMModel(LinearClassificationModel):
     ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
     ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
     ... ]
-    >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+    >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
     >>> svm.predict(SparseVector(2, {1: 1.0}))
     1
     >>> svm.predict(SparseVector(2, {0: -1.0}))

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py 
b/python/pyspark/mllib/regression.py
index a0117c5..4bc6351 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -108,7 +108,8 @@ class LinearRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, [2.0]),
     ...     LabeledPoint(2.0, [3.0])
     ... ]
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=np.array([1.0]))
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=10,
+    ...     initialWeights=np.array([1.0]))
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -135,12 +136,13 @@ class LinearRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
     ... ]
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=10,
+    ...     initialWeights=array([1.0]))
     >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=100, step=1.0,
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=10, step=1.0,
     ...    miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, 
regType="l2",
     ...    intercept=True, validateData=True)
     >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
@@ -238,7 +240,7 @@ class LassoModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, [2.0]),
     ...     LabeledPoint(2.0, [3.0])
     ... ]
-    >>> lrm = LassoWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
+    >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, 
initialWeights=array([1.0]))
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -265,12 +267,13 @@ class LassoModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
     ... ]
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=10,
+    ...     initialWeights=array([1.0]))
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
-    >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=100, 
step=1.0,
+    >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
     ...     regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), 
intercept=True,
     ...     validateData=True)
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -321,7 +324,8 @@ class RidgeRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, [2.0]),
     ...     LabeledPoint(2.0, [3.0])
     ... ]
-    >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
+    >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+    ...     initialWeights=array([1.0]))
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -348,12 +352,13 @@ class RidgeRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
     ... ]
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
iterations=10,
+    ...     initialWeights=array([1.0]))
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
-    >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), 
iterations=100, step=1.0,
+    >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), 
iterations=10, step=1.0,
     ...     regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), 
intercept=True,
     ...     validateData=True)
     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -396,7 +401,7 @@ def _test():
     from pyspark import SparkContext
     import pyspark.mllib.regression
     globs = pyspark.mllib.regression.__dict__.copy()
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
     (failure_count, test_count) = doctest.testmod(globs=globs, 
optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
     if failure_count:

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8f89e2c..1b008b9 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -36,6 +36,7 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
+from pyspark import SparkContext
 from pyspark.mllib.common import _to_java_object_rdd
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, 
_convert_to_vector,\
     DenseMatrix, SparseMatrix, Vectors, Matrices
@@ -47,7 +48,6 @@ from pyspark.mllib.feature import IDF
 from pyspark.mllib.feature import StandardScaler
 from pyspark.serializers import PickleSerializer
 from pyspark.sql import SQLContext
-from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
 
 _have_scipy = False
 try:
@@ -58,6 +58,12 @@ except:
     pass
 
 ser = PickleSerializer()
+sc = SparkContext('local[4]', "MLlib tests")
+
+
+class MLlibTestCase(unittest.TestCase):
+    def setUp(self):
+        self.sc = sc
 
 
 def _squared_distance(a, b):
@@ -67,7 +73,7 @@ def _squared_distance(a, b):
         return b.squared_distance(a)
 
 
-class VectorTests(PySparkTestCase):
+class VectorTests(MLlibTestCase):
 
     def _test_serialize(self, v):
         self.assertEqual(v, ser.loads(ser.dumps(v)))
@@ -212,7 +218,7 @@ class VectorTests(PySparkTestCase):
         self.assertTrue(array_equal(sm.values, [1, 3, 4, 6, 9]))
 
 
-class ListTests(PySparkTestCase):
+class ListTests(MLlibTestCase):
 
     """
     Test MLlib algorithms on plain lists, to make sure they're passed through
@@ -255,7 +261,7 @@ class ListTests(PySparkTestCase):
             [-6, -7],
         ])
         clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
-                                         maxIterations=100, seed=56)
+                                         maxIterations=10, seed=56)
         labels = clusters.predict(data).collect()
         self.assertEquals(labels[0], labels[1])
         self.assertEquals(labels[2], labels[3])
@@ -266,9 +272,9 @@ class ListTests(PySparkTestCase):
         y = range(0, 100, 10)
         data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
         clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
-                                          maxIterations=100, seed=63)
+                                          maxIterations=10, seed=63)
         clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
-                                          maxIterations=100, seed=63)
+                                          maxIterations=10, seed=63)
         for c1, c2 in zip(clusters1.weights, clusters2.weights):
             self.assertEquals(round(c1, 7), round(c2, 7))
 
@@ -287,13 +293,13 @@ class ListTests(PySparkTestCase):
 
         temp_dir = tempfile.mkdtemp()
 
-        lr_model = LogisticRegressionWithSGD.train(rdd)
+        lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
         self.assertTrue(lr_model.predict(features[0]) <= 0)
         self.assertTrue(lr_model.predict(features[1]) > 0)
         self.assertTrue(lr_model.predict(features[2]) <= 0)
         self.assertTrue(lr_model.predict(features[3]) > 0)
 
-        svm_model = SVMWithSGD.train(rdd)
+        svm_model = SVMWithSGD.train(rdd, iterations=10)
         self.assertTrue(svm_model.predict(features[0]) <= 0)
         self.assertTrue(svm_model.predict(features[1]) > 0)
         self.assertTrue(svm_model.predict(features[2]) <= 0)
@@ -307,7 +313,7 @@ class ListTests(PySparkTestCase):
 
         categoricalFeaturesInfo = {0: 3}  # feature 0 has 3 categories
         dt_model = DecisionTree.trainClassifier(
-            rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
+            rdd, numClasses=2, 
categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
         self.assertTrue(dt_model.predict(features[0]) <= 0)
         self.assertTrue(dt_model.predict(features[1]) > 0)
         self.assertTrue(dt_model.predict(features[2]) <= 0)
@@ -319,7 +325,8 @@ class ListTests(PySparkTestCase):
         self.assertEqual(same_dt_model.toDebugString(), 
dt_model.toDebugString())
 
         rf_model = RandomForest.trainClassifier(
-            rdd, numClasses=2, 
categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
+            rdd, numClasses=2, 
categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10,
+            maxBins=4, seed=1)
         self.assertTrue(rf_model.predict(features[0]) <= 0)
         self.assertTrue(rf_model.predict(features[1]) > 0)
         self.assertTrue(rf_model.predict(features[2]) <= 0)
@@ -331,7 +338,7 @@ class ListTests(PySparkTestCase):
         self.assertEqual(same_rf_model.toDebugString(), 
rf_model.toDebugString())
 
         gbt_model = GradientBoostedTrees.trainClassifier(
-            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, 
numIterations=4)
         self.assertTrue(gbt_model.predict(features[0]) <= 0)
         self.assertTrue(gbt_model.predict(features[1]) > 0)
         self.assertTrue(gbt_model.predict(features[2]) <= 0)
@@ -360,19 +367,19 @@ class ListTests(PySparkTestCase):
         rdd = self.sc.parallelize(data)
         features = [p.features.tolist() for p in data]
 
-        lr_model = LinearRegressionWithSGD.train(rdd)
+        lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
         self.assertTrue(lr_model.predict(features[0]) <= 0)
         self.assertTrue(lr_model.predict(features[1]) > 0)
         self.assertTrue(lr_model.predict(features[2]) <= 0)
         self.assertTrue(lr_model.predict(features[3]) > 0)
 
-        lasso_model = LassoWithSGD.train(rdd)
+        lasso_model = LassoWithSGD.train(rdd, iterations=10)
         self.assertTrue(lasso_model.predict(features[0]) <= 0)
         self.assertTrue(lasso_model.predict(features[1]) > 0)
         self.assertTrue(lasso_model.predict(features[2]) <= 0)
         self.assertTrue(lasso_model.predict(features[3]) > 0)
 
-        rr_model = RidgeRegressionWithSGD.train(rdd)
+        rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
         self.assertTrue(rr_model.predict(features[0]) <= 0)
         self.assertTrue(rr_model.predict(features[1]) > 0)
         self.assertTrue(rr_model.predict(features[2]) <= 0)
@@ -380,35 +387,35 @@ class ListTests(PySparkTestCase):
 
         categoricalFeaturesInfo = {0: 2}  # feature 0 has 2 categories
         dt_model = DecisionTree.trainRegressor(
-            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
         self.assertTrue(dt_model.predict(features[0]) <= 0)
         self.assertTrue(dt_model.predict(features[1]) > 0)
         self.assertTrue(dt_model.predict(features[2]) <= 0)
         self.assertTrue(dt_model.predict(features[3]) > 0)
 
         rf_model = RandomForest.trainRegressor(
-            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, 
numTrees=100, seed=1)
+            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, 
maxBins=4, seed=1)
         self.assertTrue(rf_model.predict(features[0]) <= 0)
         self.assertTrue(rf_model.predict(features[1]) > 0)
         self.assertTrue(rf_model.predict(features[2]) <= 0)
         self.assertTrue(rf_model.predict(features[3]) > 0)
 
         gbt_model = GradientBoostedTrees.trainRegressor(
-            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+            rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, 
numIterations=4)
         self.assertTrue(gbt_model.predict(features[0]) <= 0)
         self.assertTrue(gbt_model.predict(features[1]) > 0)
         self.assertTrue(gbt_model.predict(features[2]) <= 0)
         self.assertTrue(gbt_model.predict(features[3]) > 0)
 
         try:
-            LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 
1.0]))
-            LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
-            RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
+            LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 
1.0]), iterations=10)
+            LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), 
iterations=10)
+            RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 
1.0]), iterations=10)
         except ValueError:
             self.fail()
 
 
-class StatTests(PySparkTestCase):
+class StatTests(MLlibTestCase):
     # SPARK-4023
     def test_col_with_different_rdds(self):
         # numpy
@@ -438,7 +445,7 @@ class StatTests(PySparkTestCase):
         self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 
1e-14)
 
 
-class VectorUDTTests(PySparkTestCase):
+class VectorUDTTests(MLlibTestCase):
 
     dv0 = DenseVector([])
     dv1 = DenseVector([1.0, 2.0])
@@ -472,7 +479,7 @@ class VectorUDTTests(PySparkTestCase):
 
 
 @unittest.skipIf(not _have_scipy, "SciPy not installed")
-class SciPyTests(PySparkTestCase):
+class SciPyTests(MLlibTestCase):
 
     """
     Test both vector operations and MLlib algorithms with SciPy sparse 
matrices,
@@ -613,7 +620,7 @@ class SciPyTests(PySparkTestCase):
         self.assertTrue(dt_model.predict(features[3]) > 0)
 
 
-class ChiSqTestTests(PySparkTestCase):
+class ChiSqTestTests(MLlibTestCase):
     def test_goodness_of_fit(self):
         from numpy import inf
 
@@ -711,13 +718,13 @@ class ChiSqTestTests(PySparkTestCase):
         self.assertIsNotNone(chi[1000])
 
 
-class SerDeTest(PySparkTestCase):
+class SerDeTest(MLlibTestCase):
     def test_to_java_object_rdd(self):  # SPARK-6660
         data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0)
         self.assertEqual(_to_java_object_rdd(data).count(), 10)
 
 
-class FeatureTest(PySparkTestCase):
+class FeatureTest(MLlibTestCase):
     def test_idf_model(self):
         data = [
             Vectors.dense([1, 2, 6, 0, 2, 3, 1, 1, 0, 0, 3]),
@@ -730,13 +737,8 @@ class FeatureTest(PySparkTestCase):
         self.assertEqual(len(idf), 11)
 
 
-class Word2VecTests(PySparkTestCase):
+class Word2VecTests(MLlibTestCase):
     def test_word2vec_setters(self):
-        data = [
-            ["I", "have", "a", "pen"],
-            ["I", "like", "soccer", "very", "much"],
-            ["I", "live", "in", "Tokyo"]
-        ]
         model = Word2Vec() \
             .setVectorSize(2) \
             .setLearningRate(0.01) \
@@ -765,7 +767,7 @@ class Word2VecTests(PySparkTestCase):
         self.assertEquals(len(model.getVectors()), 3)
 
 
-class StandardScalerTests(PySparkTestCase):
+class StandardScalerTests(MLlibTestCase):
     def test_model_setters(self):
         data = [
             [1.0, 2.0, 3.0],
@@ -793,3 +795,4 @@ if __name__ == "__main__":
     unittest.main()
     if not _have_scipy:
         print("NOTE: SciPy tests were skipped as it does not seem to be 
installed")
+    sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 0fe6e4f..cfcbea5 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -482,13 +482,13 @@ class GradientBoostedTrees(object):
         ...     LabeledPoint(1.0, [3.0])
         ... ]
         >>>
-        >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), 
{})
+        >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), 
{}, numIterations=10)
         >>> model.numTrees()
-        100
+        10
         >>> model.totalNumNodes()
-        300
+        30
         >>> print(model)  # it already has newline
-        TreeEnsembleModel classifier with 100 trees
+        TreeEnsembleModel classifier with 10 trees
         <BLANKLINE>
         >>> model.predict([2.0])
         1.0
@@ -541,11 +541,12 @@ class GradientBoostedTrees(object):
         ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
         ... ]
         >>>
-        >>> model = 
GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {})
+        >>> data = sc.parallelize(sparse_data)
+        >>> model = GradientBoostedTrees.trainRegressor(data, {}, 
numIterations=10)
         >>> model.numTrees()
-        100
+        10
         >>> model.totalNumNodes()
-        102
+        12
         >>> model.predict(SparseVector(2, {1: 1.0}))
         1.0
         >>> model.predict(SparseVector(2, {0: 1.0}))

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b54baa5..1d0b16c 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
         goes above the limit.
         """
         global MemoryBytesSpilled, DiskBytesSpilled
-        batch, limit = 100, self._next_limit()
+        batch, limit = 100, self.memory_limit
         chunks, current_chunk = [], []
         iterator = iter(iterator)
         while True:
@@ -497,7 +497,7 @@ class ExternalSorter(object):
                 break
 
             used_memory = get_used_memory()
-            if used_memory > self.memory_limit:
+            if used_memory > limit:
                 # sort them inplace will save memory
                 current_chunk.sort(key=key, reverse=reverse)
                 path = self._get_path(len(chunks))
@@ -513,13 +513,14 @@ class ExternalSorter(object):
                 chunks.append(load(open(path, 'rb')))
                 current_chunk = []
                 gc.collect()
+                batch //= 2
                 limit = self._next_limit()
                 MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
                 DiskBytesSpilled += os.path.getsize(path)
                 os.unlink(path)  # data will be deleted after close
 
             elif not chunks:
-                batch = min(batch * 2, 10000)
+                batch = min(int(batch * 1.5), 10000)
 
         current_chunk.sort(key=key, reverse=reverse)
         if not chunks:

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 23e8428..fe43c37 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -109,7 +109,7 @@ class SQLTests(ReusedPySparkTestCase):
         os.unlink(cls.tempdir.name)
         cls.sqlCtx = SQLContext(cls.sc)
         cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
-        rdd = cls.sc.parallelize(cls.testData)
+        rdd = cls.sc.parallelize(cls.testData, 2)
         cls.df = rdd.toDF()
 
     @classmethod
@@ -303,7 +303,7 @@ class SQLTests(ReusedPySparkTestCase):
         abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
         schema = _parse_schema_abstract(abstract)
         typedSchema = _infer_schema_type(rdd.first(), schema)
-        df = self.sqlCtx.applySchema(rdd, typedSchema)
+        df = self.sqlCtx.createDataFrame(rdd, typedSchema)
         r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, 
Row(b=2), [1, 2, 3])
         self.assertEqual(r, tuple(df.first()))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 33f958a..5fa1e5e 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -16,14 +16,23 @@
 #
 
 import os
+import sys
 from itertools import chain
 import time
 import operator
-import unittest
 import tempfile
 import struct
 from functools import reduce
 
+if sys.version_info[:2] <= (2, 6):
+    try:
+        import unittest2 as unittest
+    except ImportError:
+        sys.stderr.write('Please install unittest2 to test with Python 2.6 or 
earlier')
+        sys.exit(1)
+else:
+    import unittest
+
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import KafkaUtils
@@ -31,19 +40,25 @@ from pyspark.streaming.kafka import KafkaUtils
 
 class PySparkStreamingTestCase(unittest.TestCase):
 
-    timeout = 20  # seconds
-    duration = 1
+    timeout = 4  # seconds
+    duration = .2
 
-    def setUp(self):
-        class_name = self.__class__.__name__
+    @classmethod
+    def setUpClass(cls):
+        class_name = cls.__name__
         conf = SparkConf().set("spark.default.parallelism", 1)
-        self.sc = SparkContext(appName=class_name, conf=conf)
-        self.sc.setCheckpointDir("/tmp")
-        # TODO: decrease duration to speed up tests
+        cls.sc = SparkContext(appName=class_name, conf=conf)
+        cls.sc.setCheckpointDir("/tmp")
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sc.stop()
+
+    def setUp(self):
         self.ssc = StreamingContext(self.sc, self.duration)
 
     def tearDown(self):
-        self.ssc.stop()
+        self.ssc.stop(False)
 
     def wait_for(self, result, n):
         start_time = time.time()
@@ -363,13 +378,13 @@ class BasicOperationTests(PySparkStreamingTestCase):
 
 class WindowFunctionTests(PySparkStreamingTestCase):
 
-    timeout = 20
+    timeout = 5
 
     def test_window(self):
         input = [range(1), range(2), range(3), range(4), range(5)]
 
         def func(dstream):
-            return dstream.window(3, 1).count()
+            return dstream.window(.6, .2).count()
 
         expected = [[1], [3], [6], [9], [12], [9], [5]]
         self._test_func(input, func, expected)
@@ -378,7 +393,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         input = [range(1), range(2), range(3), range(4), range(5)]
 
         def func(dstream):
-            return dstream.countByWindow(3, 1)
+            return dstream.countByWindow(.6, .2)
 
         expected = [[1], [3], [6], [9], [12], [9], [5]]
         self._test_func(input, func, expected)
@@ -387,7 +402,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         input = [range(1), range(2), range(3), range(4), range(5), range(6)]
 
         def func(dstream):
-            return dstream.countByWindow(5, 1)
+            return dstream.countByWindow(1, .2)
 
         expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]]
         self._test_func(input, func, expected)
@@ -396,7 +411,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         input = [range(1), range(2), range(3), range(4), range(5), range(6)]
 
         def func(dstream):
-            return dstream.countByValueAndWindow(5, 1)
+            return dstream.countByValueAndWindow(1, .2)
 
         expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
         self._test_func(input, func, expected)
@@ -405,7 +420,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         input = [[('a', i)] for i in range(5)]
 
         def func(dstream):
-            return dstream.groupByKeyAndWindow(3, 1).mapValues(list)
+            return dstream.groupByKeyAndWindow(.6, .2).mapValues(list)
 
         expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', 
[1, 2, 3])],
                     [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
@@ -436,8 +451,8 @@ class StreamingContextTests(PySparkStreamingTestCase):
     def test_stop_multiple_times(self):
         self._add_input_stream()
         self.ssc.start()
-        self.ssc.stop()
-        self.ssc.stop()
+        self.ssc.stop(False)
+        self.ssc.stop(False)
 
     def test_queue_stream(self):
         input = [list(range(i + 1)) for i in range(3)]
@@ -495,10 +510,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
         self.assertEqual([2, 3, 1], self._take(dstream, 3))
 
 
-class CheckpointTests(PySparkStreamingTestCase):
-
-    def setUp(self):
-        pass
+class CheckpointTests(unittest.TestCase):
 
     def test_get_or_create(self):
         inputd = tempfile.mkdtemp()
@@ -518,12 +530,12 @@ class CheckpointTests(PySparkStreamingTestCase):
             return ssc
 
         cpd = tempfile.mkdtemp("test_streaming_cps")
-        self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+        ssc = StreamingContext.getOrCreate(cpd, setup)
         ssc.start()
 
         def check_output(n):
             while not os.listdir(outputd):
-                time.sleep(0.1)
+                time.sleep(0.01)
             time.sleep(1)  # make sure mtime is larger than the previous one
             with open(os.path.join(inputd, str(n)), 'w') as f:
                 f.writelines(["%d\n" % i for i in range(10)])
@@ -553,12 +565,15 @@ class CheckpointTests(PySparkStreamingTestCase):
         ssc.stop(True, True)
 
         time.sleep(1)
-        self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+        ssc = StreamingContext.getOrCreate(cpd, setup)
         ssc.start()
         check_output(3)
+        ssc.stop(True, True)
 
 
 class KafkaStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
 
     def setUp(self):
         super(KafkaStreamTests, self).setUp()

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 75f39d9..ea63a39 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -31,7 +31,6 @@ import tempfile
 import time
 import zipfile
 import random
-import itertools
 import threading
 import hashlib
 
@@ -49,6 +48,11 @@ else:
         xrange = range
         basestring = str
 
+if sys.version >= "3":
+    from io import StringIO
+else:
+    from StringIO import StringIO
+
 
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
@@ -196,7 +200,7 @@ class SorterTests(unittest.TestCase):
         sc = SparkContext(conf=conf)
         l = list(range(10240))
         random.shuffle(l)
-        rdd = sc.parallelize(l, 2)
+        rdd = sc.parallelize(l, 4)
         self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
         sc.stop()
 
@@ -300,6 +304,18 @@ class SerializationTestCase(unittest.TestCase):
         hash(FlattenedValuesSerializer(PickleSerializer()))
 
 
+class QuietTest(object):
+    def __init__(self, sc):
+        self.log4j = sc._jvm.org.apache.log4j
+
+    def __enter__(self):
+        self.old_level = self.log4j.LogManager.getRootLogger().getLevel()
+        self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL)
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.log4j.LogManager.getRootLogger().setLevel(self.old_level)
+
+
 class PySparkTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -371,15 +387,11 @@ class AddFileTests(PySparkTestCase):
         # To ensure that we're actually testing addPyFile's effects, check that
         # this job fails due to `userlibrary` not being on the Python path:
         # disable logging in log4j temporarily
-        log4j = self.sc._jvm.org.apache.log4j
-        old_level = log4j.LogManager.getRootLogger().getLevel()
-        log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
-
         def func(x):
             from userlibrary import UserClass
             return UserClass().hello()
-        self.assertRaises(Exception, 
self.sc.parallelize(range(2)).map(func).first)
-        log4j.LogManager.getRootLogger().setLevel(old_level)
+        with QuietTest(self.sc):
+            self.assertRaises(Exception, 
self.sc.parallelize(range(2)).map(func).first)
 
         # Add the file, so the job should now succeed:
         path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
@@ -496,7 +508,8 @@ class RDDTests(ReusedPySparkTestCase):
         filtered_data = data.filter(lambda x: True)
         self.assertEqual(1, filtered_data.count())
         os.unlink(tempFile.name)
-        self.assertRaises(Exception, lambda: filtered_data.count())
+        with QuietTest(self.sc):
+            self.assertRaises(Exception, lambda: filtered_data.count())
 
     def test_sampling_default_seed(self):
         # Test for SPARK-3995 (default seed setting)
@@ -536,9 +549,9 @@ class RDDTests(ReusedPySparkTestCase):
         self.assertEqual([jon, jane], theDoes.collect())
 
     def test_large_broadcast(self):
-        N = 100000
+        N = 10000
         data = [[float(i) for i in range(300)] for i in range(N)]
-        bdata = self.sc.broadcast(data)  # 270MB
+        bdata = self.sc.broadcast(data)  # 27MB
         m = self.sc.parallelize(range(1), 1).map(lambda x: 
len(bdata.value)).sum()
         self.assertEqual(N, m)
 
@@ -569,7 +582,7 @@ class RDDTests(ReusedPySparkTestCase):
         self.assertEqual(checksum, csum)
 
     def test_large_closure(self):
-        N = 1000000
+        N = 200000
         data = [float(i) for i in xrange(N)]
         rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
         self.assertEqual(N, rdd.first())
@@ -604,17 +617,18 @@ class RDDTests(ReusedPySparkTestCase):
         # different number of partitions
         b = self.sc.parallelize(range(100, 106), 3)
         self.assertRaises(ValueError, lambda: a.zip(b))
-        # different number of batched items in JVM
-        b = self.sc.parallelize(range(100, 104), 2)
-        self.assertRaises(Exception, lambda: a.zip(b).count())
-        # different number of items in one pair
-        b = self.sc.parallelize(range(100, 106), 2)
-        self.assertRaises(Exception, lambda: a.zip(b).count())
-        # same total number of items, but different distributions
-        a = self.sc.parallelize([2, 3], 2).flatMap(range)
-        b = self.sc.parallelize([3, 2], 2).flatMap(range)
-        self.assertEqual(a.count(), b.count())
-        self.assertRaises(Exception, lambda: a.zip(b).count())
+        with QuietTest(self.sc):
+            # different number of batched items in JVM
+            b = self.sc.parallelize(range(100, 104), 2)
+            self.assertRaises(Exception, lambda: a.zip(b).count())
+            # different number of items in one pair
+            b = self.sc.parallelize(range(100, 106), 2)
+            self.assertRaises(Exception, lambda: a.zip(b).count())
+            # same total number of items, but different distributions
+            a = self.sc.parallelize([2, 3], 2).flatMap(range)
+            b = self.sc.parallelize([3, 2], 2).flatMap(range)
+            self.assertEqual(a.count(), b.count())
+            self.assertRaises(Exception, lambda: a.zip(b).count())
 
     def test_count_approx_distinct(self):
         rdd = self.sc.parallelize(range(1000))
@@ -877,7 +891,12 @@ class ProfilerTests(PySparkTestCase):
         func_names = [func_name for fname, n, func_name in stat_list]
         self.assertTrue("heavy_foo" in func_names)
 
+        old_stdout = sys.stdout
+        sys.stdout = io = StringIO()
         self.sc.show_profiles()
+        self.assertTrue("heavy_foo" in io.getvalue())
+        sys.stdout = old_stdout
+
         d = tempfile.gettempdir()
         self.sc.dump_profiles(d)
         self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
@@ -901,7 +920,7 @@ class ProfilerTests(PySparkTestCase):
 
     def do_computation(self):
         def heavy_foo(x):
-            for i in range(1 << 20):
+            for i in range(1 << 18):
                 x = 1
 
         rdd = self.sc.parallelize(range(100))
@@ -1417,7 +1436,7 @@ class DaemonTests(unittest.TestCase):
         self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
 
 
-class WorkerTests(PySparkTestCase):
+class WorkerTests(ReusedPySparkTestCase):
     def test_cancel_task(self):
         temp = tempfile.NamedTemporaryFile(delete=True)
         temp.close()
@@ -1432,7 +1451,10 @@ class WorkerTests(PySparkTestCase):
 
         # start job in background thread
         def run():
-            self.sc.parallelize(range(1), 1).foreach(sleep)
+            try:
+                self.sc.parallelize(range(1), 1).foreach(sleep)
+            except Exception:
+                pass
         import threading
         t = threading.Thread(target=run)
         t.daemon = True
@@ -1473,7 +1495,8 @@ class WorkerTests(PySparkTestCase):
         def raise_exception(_):
             raise Exception()
         rdd = self.sc.parallelize(range(100), 1)
-        self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
+        with QuietTest(self.sc):
+            self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
         self.assertEqual(100, rdd.map(str).count())
 
     def test_after_jvm_exception(self):
@@ -1484,7 +1507,8 @@ class WorkerTests(PySparkTestCase):
         filtered_data = data.filter(lambda x: True)
         self.assertEqual(1, filtered_data.count())
         os.unlink(tempFile.name)
-        self.assertRaises(Exception, lambda: filtered_data.count())
+        with QuietTest(self.sc):
+            self.assertRaises(Exception, lambda: filtered_data.count())
 
         rdd = self.sc.parallelize(range(100), 1)
         self.assertEqual(100, rdd.map(str).count())
@@ -1522,14 +1546,11 @@ class WorkerTests(PySparkTestCase):
         rdd.count()
         version = sys.version_info
         sys.version_info = (2, 0, 0)
-        log4j = self.sc._jvm.org.apache.log4j
-        old_level = log4j.LogManager.getRootLogger().getLevel()
-        log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
         try:
-            self.assertRaises(Py4JJavaError, lambda: rdd.count())
+            with QuietTest(self.sc):
+                self.assertRaises(Py4JJavaError, lambda: rdd.count())
         finally:
             sys.version_info = version
-            log4j.LogManager.getRootLogger().setLevel(old_level)
 
 
 class SparkSubmitTests(unittest.TestCase):
@@ -1751,9 +1772,14 @@ class ContextTests(unittest.TestCase):
     def test_progress_api(self):
         with SparkContext() as sc:
             sc.setJobGroup('test_progress_api', '', True)
-
             rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
-            t = threading.Thread(target=rdd.collect)
+
+            def run():
+                try:
+                    rdd.count()
+                except Exception:
+                    pass
+            t = threading.Thread(target=run)
             t.daemon = True
             t.start()
             # wait for scheduler to start

http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/run-tests
----------------------------------------------------------------------
diff --git a/python/run-tests b/python/run-tests
index ed3e819..88b63b8 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -28,6 +28,7 @@ cd "$FWDIR/python"
 
 FAILED=0
 LOG_FILE=unit-tests.log
+START=$(date +"%s")
 
 rm -f $LOG_FILE
 
@@ -35,8 +36,8 @@ rm -f $LOG_FILE
 rm -rf metastore warehouse
 
 function run_test() {
-    echo "Running test: $1" | tee -a $LOG_FILE
-
+    echo -en "Running test: $1 ... " | tee -a $LOG_FILE
+    start=$(date +"%s")
     SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1
 
     FAILED=$((PIPESTATUS[0]||$FAILED))
@@ -48,6 +49,9 @@ function run_test() {
         echo "Had test failures; see logs."
         echo -en "\033[0m"  # No color
         exit -1
+    else
+        now=$(date +"%s")
+        echo "ok ($(($now - $start))s)"
     fi
 }
 
@@ -161,9 +165,8 @@ if [ $(which pypy) ]; then
 fi
 
 if [[ $FAILED == 0 ]]; then
-    echo -en "\033[32m"  # Green
-    echo "Tests passed."
-    echo -en "\033[0m"  # No color
+    now=$(date +"%s")
+    echo -e "\033[32mTests passed \033[0min $(($now - $START)) seconds"
 fi
 
 # TODO: in the long-run, it would be nice to use a test runner like `nose`.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to