Repository: spark
Updated Branches:
  refs/heads/master a03e5b81e -> fce5e251d


http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/random.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index d53c95f..a787e4d 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -19,15 +19,32 @@
 Python package for random data generation.
 """
 
+from functools import wraps
 
 from pyspark.rdd import RDD
-from pyspark.mllib._common import _deserialize_double, 
_deserialize_double_vector
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
 
 
 __all__ = ['RandomRDDs', ]
 
 
+def serialize(f):
+    @wraps(f)
+    def func(sc, *a, **kw):
+        jrdd = f(sc, *a, **kw)
+        return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+                   BatchedSerializer(PickleSerializer(), 1024))
+    return func
+
+
+def toArray(f):
+    @wraps(f)
+    def func(sc, *a, **kw):
+        rdd = f(sc, *a, **kw)
+        return rdd.map(lambda vec: vec.toArray())
+    return func
+
+
 class RandomRDDs(object):
     """
     Generator methods for creating RDDs comprised of i.i.d samples from
@@ -35,6 +52,7 @@ class RandomRDDs(object):
     """
 
     @staticmethod
+    @serialize
     def uniformRDD(sc, size, numPartitions=None, seed=None):
         """
         Generates an RDD comprised of i.i.d. samples from the
@@ -56,11 +74,10 @@ class RandomRDDs(object):
         >>> parts == sc.defaultParallelism
         True
         """
-        jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, 
numPartitions, seed)
-        uniform = RDD(jrdd, sc, NoOpSerializer())
-        return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+        return sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, 
numPartitions, seed)
 
     @staticmethod
+    @serialize
     def normalRDD(sc, size, numPartitions=None, seed=None):
         """
         Generates an RDD comprised of i.i.d. samples from the standard normal
@@ -80,11 +97,10 @@ class RandomRDDs(object):
         >>> abs(stats.stdev() - 1.0) < 0.1
         True
         """
-        jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, 
numPartitions, seed)
-        normal = RDD(jrdd, sc, NoOpSerializer())
-        return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+        return sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, 
numPartitions, seed)
 
     @staticmethod
+    @serialize
     def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
         """
         Generates an RDD comprised of i.i.d. samples from the Poisson
@@ -101,11 +117,11 @@ class RandomRDDs(object):
         >>> abs(stats.stdev() - sqrt(mean)) < 0.5
         True
         """
-        jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, 
numPartitions, seed)
-        poisson = RDD(jrdd, sc, NoOpSerializer())
-        return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+        return sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, 
numPartitions, seed)
 
     @staticmethod
+    @toArray
+    @serialize
     def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
         """
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -120,12 +136,12 @@ class RandomRDDs(object):
         >>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
         4
         """
-        jrdd = sc._jvm.PythonMLLibAPI() \
+        return sc._jvm.PythonMLLibAPI() \
             .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
-        uniform = RDD(jrdd, sc, NoOpSerializer())
-        return uniform.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
     @staticmethod
+    @toArray
+    @serialize
     def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
         """
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -140,12 +156,12 @@ class RandomRDDs(object):
         >>> abs(mat.std() - 1.0) < 0.1
         True
         """
-        jrdd = sc._jvm.PythonMLLibAPI() \
+        return sc._jvm.PythonMLLibAPI() \
             .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
-        normal = RDD(jrdd, sc, NoOpSerializer())
-        return normal.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
     @staticmethod
+    @toArray
+    @serialize
     def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, 
seed=None):
         """
         Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -163,10 +179,8 @@ class RandomRDDs(object):
         >>> abs(mat.std() - sqrt(mean)) < 0.5
         True
         """
-        jrdd = sc._jvm.PythonMLLibAPI() \
+        return sc._jvm.PythonMLLibAPI() \
             .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, 
seed)
-        poisson = RDD(jrdd, sc, NoOpSerializer())
-        return poisson.map(lambda bytes: 
_deserialize_double_vector(bytearray(bytes)))
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py 
b/python/pyspark/mllib/recommendation.py
index 2df2339..59c1c5f 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -16,17 +16,25 @@
 #
 
 from pyspark import SparkContext
-from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
-    _serialize_double_matrix, _deserialize_double_matrix, \
-    _serialize_double_vector, _deserialize_double_vector, \
-    _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
-    _serialize_tuple, RatingDeserializer
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
 from pyspark.rdd import RDD
 
 __all__ = ['MatrixFactorizationModel', 'ALS']
 
 
+class Rating(object):
+    def __init__(self, user, product, rating):
+        self.user = int(user)
+        self.product = int(product)
+        self.rating = float(rating)
+
+    def __reduce__(self):
+        return Rating, (self.user, self.product, self.rating)
+
+    def __repr__(self):
+        return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+
+
 class MatrixFactorizationModel(object):
 
     """A matrix factorisation model trained by regularized alternating
@@ -39,7 +47,9 @@ class MatrixFactorizationModel(object):
     >>> model = ALS.trainImplicit(ratings, 1)
     >>> model.predict(2,2) is not None
     True
+
     >>> testset = sc.parallelize([(1, 2), (1, 1)])
+    >>> model = ALS.train(ratings, 1)
     >>> model.predictAll(testset).count() == 2
     True
     """
@@ -54,34 +64,61 @@ class MatrixFactorizationModel(object):
     def predict(self, user, product):
         return self._java_model.predict(user, product)
 
-    def predictAll(self, usersProducts):
-        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
-        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
-                   self._context, RatingDeserializer())
+    def predictAll(self, user_product):
+        assert isinstance(user_product, RDD), "user_product should be RDD of 
(user, product)"
+        first = user_product.first()
+        if isinstance(first, list):
+            user_product = user_product.map(tuple)
+            first = tuple(first)
+        assert type(first) is tuple and len(first) == 2, \
+            "user_product should be RDD of (user, product)"
+        if any(isinstance(x, str) for x in first):
+            user_product = user_product.map(lambda (u, p): (int(x), int(p)))
+            first = tuple(map(int, first))
+        assert all(type(x) is int for x in first), "user and product in 
user_product shoul be int"
+        sc = self._context
+        tuplerdd = 
sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
+        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+                   AutoBatchedSerializer(PickleSerializer()))
 
 
 class ALS(object):
 
     @classmethod
+    def _prepare(cls, ratings):
+        assert isinstance(ratings, RDD), "ratings should be RDD"
+        first = ratings.first()
+        if not isinstance(first, Rating):
+            if isinstance(first, (tuple, list)):
+                ratings = ratings.map(lambda x: Rating(*x))
+            else:
+                raise ValueError("rating should be RDD of Rating or 
tuple/list")
+        # serialize them by AutoBatchedSerializer before cache to reduce the
+        # objects overhead in JVM
+        cached = 
ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
+        return cached._to_java_object_rdd()
+
+    @classmethod
     def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
         sc = ratings.context
-        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
-        mod = sc._jvm.PythonMLLibAPI().trainALSModel(
-            ratingBytes._jrdd, rank, iterations, lambda_, blocks)
+        jrating = cls._prepare(ratings)
+        mod = sc._jvm.PythonMLLibAPI().trainALSModel(jrating, rank, 
iterations, lambda_, blocks)
         return MatrixFactorizationModel(sc, mod)
 
     @classmethod
     def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, 
blocks=-1, alpha=0.01):
         sc = ratings.context
-        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+        jrating = cls._prepare(ratings)
         mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(
-            ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)
+            jrating, rank, iterations, lambda_, blocks, alpha)
         return MatrixFactorizationModel(sc, mod)
 
 
 def _test():
     import doctest
-    globs = globals().copy()
+    import pyspark.mllib.recommendation
+    globs = pyspark.mllib.recommendation.__dict__.copy()
     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
     (failure_count, test_count) = doctest.testmod(globs=globs, 
optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py 
b/python/pyspark/mllib/regression.py
index f572dcf..cbdbc09 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,12 +15,12 @@
 # limitations under the License.
 #
 
-from numpy import array, ndarray
-from pyspark import SparkContext
-from pyspark.mllib._common import _dot, _regression_train_wrapper, \
-    _linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector, Vectors
+import numpy as np
+from numpy import array
 
+from pyspark import SparkContext
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
 
 __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 
'RidgeRegressionModel'
            'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -38,16 +38,16 @@ class LabeledPoint(object):
 
     def __init__(self, label, features):
         self.label = label
-        if (type(features) == ndarray or type(features) == SparseVector
-                or (_have_scipy and _scipy_issparse(features))):
-            self.features = features
-        elif type(features) == list:
-            self.features = array(features)
-        else:
-            raise TypeError("Expected NumPy array, list, SparseVector, or 
scipy.sparse matrix")
+        self.features = _convert_to_vector(features)
+
+    def __reduce__(self):
+        return (LabeledPoint, (self.label, self.features))
 
     def __str__(self):
-        return "(" + ",".join((str(self.label), 
Vectors.stringify(self.features))) + ")"
+        return "(" + ",".join((str(self.label), str(self.features))) + ")"
+
+    def __repr__(self):
+        return "LabeledPoint(" + ",".join((repr(self.label), 
repr(self.features))) + ")"
 
 
 class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
     """A linear model that has a vector of coefficients and an intercept."""
 
     def __init__(self, weights, intercept):
-        self._coeff = weights
+        self._coeff = _convert_to_vector(weights)
         self._intercept = intercept
 
     @property
@@ -71,18 +71,19 @@ class LinearRegressionModelBase(LinearModel):
 
     """A linear regression model.
 
-    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
-    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+    >>> lrmb = LinearRegressionModelBase(np.array([1.0, 2.0]), 0.1)
+    >>> abs(lrmb.predict(np.array([-1.03, 7.777])) - 14.624) < 1e-6
     True
     >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 
1e-6
     True
     """
 
     def predict(self, x):
-        """Predict the value of the dependent variable given a vector x"""
-        """containing values for the independent variables."""
-        _linear_predictor_typecheck(x, self._coeff)
-        return _dot(x, self._coeff) + self._intercept
+        """
+        Predict the value of the dependent variable given a vector x
+        containing values for the independent variables.
+        """
+        return self.weights.dot(x) + self.intercept
 
 
 class LinearRegressionModel(LinearRegressionModelBase):
@@ -96,10 +97,10 @@ class LinearRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(3.0, [2.0]),
     ...     LabeledPoint(2.0, [3.0])
     ... ]
-    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
-    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=np.array([1.0]))
+    >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
-    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
@@ -117,11 +118,27 @@ class LinearRegressionModel(LinearRegressionModelBase):
     """
 
 
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, modelClass, data, 
initial_weights):
+    initial_weights = initial_weights or [0.0] * len(data.first().features)
+    ser = PickleSerializer()
+    initial_bytes = bytearray(ser.dumps(_convert_to_vector(initial_weights)))
+    # use AutoBatchedSerializer before cache to reduce the memory
+    # overhead in JVM
+    cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
+    ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+    assert len(ans) == 2, "JVM call result had unexpected length"
+    weights = ser.loads(str(ans[0]))
+    return modelClass(weights, ans[1])
+
+
 class LinearRegressionWithSGD(object):
 
     @classmethod
     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
-              initialWeights=None, regParam=1.0, regType=None, 
intercept=False):
+              initialWeights=None, regParam=1.0, regType="none", 
intercept=False):
         """
         Train a linear regression model on the given data.
 
@@ -146,11 +163,12 @@ class LinearRegressionWithSGD(object):
                                   are activated or not).
         """
         sc = data.context
-        if regType is None:
-            regType = "none"
-        train_f = lambda d, i: 
sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
-            d._jrdd, iterations, step, miniBatchFraction, i, regParam, 
regType, intercept)
-        return _regression_train_wrapper(sc, train_f, LinearRegressionModel, 
data, initialWeights)
+
+        def train(jrdd, i):
+            return sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
+                jrdd, iterations, step, miniBatchFraction, i, regParam, 
regType, intercept)
+
+        return _regression_train_wrapper(sc, train, LinearRegressionModel, 
data, initialWeights)
 
 
 class LassoModel(LinearRegressionModelBase):
@@ -166,9 +184,9 @@ class LassoModel(LinearRegressionModelBase):
     ...     LabeledPoint(2.0, [3.0])
     ... ]
     >>> lrm = LassoWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
-    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
-    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
@@ -179,7 +197,7 @@ class LassoModel(LinearRegressionModelBase):
     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
     ... ]
     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
-    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
@@ -193,9 +211,11 @@ class LassoWithSGD(object):
               miniBatchFraction=1.0, initialWeights=None):
         """Train a Lasso regression model on the given data."""
         sc = data.context
-        train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
-            d._jrdd, iterations, step, regParam, miniBatchFraction, i)
-        return _regression_train_wrapper(sc, train_f, LassoModel, data, 
initialWeights)
+
+        def train(jrdd, i):
+            return sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
+                jrdd, iterations, step, regParam, miniBatchFraction, i)
+        return _regression_train_wrapper(sc, train, LassoModel, data, 
initialWeights)
 
 
 class RidgeRegressionModel(LinearRegressionModelBase):
@@ -211,9 +231,9 @@ class RidgeRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(2.0, [3.0])
     ... ]
     >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
-    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
-    >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+    >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
@@ -224,7 +244,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
     ... ]
     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), 
initialWeights=array([1.0]))
-    >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+    >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
     True
     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
     True
@@ -238,9 +258,12 @@ class RidgeRegressionWithSGD(object):
               miniBatchFraction=1.0, initialWeights=None):
         """Train a ridge regression model on the given data."""
         sc = data.context
-        train_func = lambda d, i: 
sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
-            d._jrdd, iterations, step, regParam, miniBatchFraction, i)
-        return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, 
data, initialWeights)
+
+        def train(jrdd, i):
+            return sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
+                jrdd, iterations, step, regParam, miniBatchFraction, i)
+
+        return _regression_train_wrapper(sc, train, RidgeRegressionModel, 
data, initialWeights)
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 8c726f1..b9de090 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -19,14 +19,26 @@
 Python package for statistical functions in MLlib.
 """
 
-from pyspark.mllib._common import \
-    _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
-    _serialize_double, _deserialize_double_matrix, _deserialize_double_vector
+from functools import wraps
+
+from pyspark import PickleSerializer
 
 
 __all__ = ['MultivariateStatisticalSummary', 'Statistics']
 
 
+def serialize(f):
+    ser = PickleSerializer()
+
+    @wraps(f)
+    def func(self):
+        jvec = f(self)
+        bytes = self._sc._jvm.SerDe.dumps(jvec)
+        return ser.loads(str(bytes)).toArray()
+
+    return func
+
+
 class MultivariateStatisticalSummary(object):
 
     """
@@ -44,33 +56,38 @@ class MultivariateStatisticalSummary(object):
     def __del__(self):
         self._sc._gateway.detach(self._java_summary)
 
+    @serialize
     def mean(self):
-        return _deserialize_double_vector(self._java_summary.mean())
+        return self._java_summary.mean()
 
+    @serialize
     def variance(self):
-        return _deserialize_double_vector(self._java_summary.variance())
+        return self._java_summary.variance()
 
     def count(self):
         return self._java_summary.count()
 
+    @serialize
     def numNonzeros(self):
-        return _deserialize_double_vector(self._java_summary.numNonzeros())
+        return self._java_summary.numNonzeros()
 
+    @serialize
     def max(self):
-        return _deserialize_double_vector(self._java_summary.max())
+        return self._java_summary.max()
 
+    @serialize
     def min(self):
-        return _deserialize_double_vector(self._java_summary.min())
+        return self._java_summary.min()
 
 
 class Statistics(object):
 
     @staticmethod
-    def colStats(X):
+    def colStats(rdd):
         """
         Computes column-wise summary statistics for the input RDD[Vector].
 
-        >>> from linalg import Vectors
+        >>> from pyspark.mllib.linalg import Vectors
         >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
         ...                       Vectors.dense([4, 5, 0,  3]),
         ...                       Vectors.dense([6, 7, 0,  8])])
@@ -88,9 +105,9 @@ class Statistics(object):
         >>> cStats.min()
         array([ 2.,  0.,  0., -2.])
         """
-        sc = X.ctx
-        Xser = _get_unmangled_double_vector_rdd(X)
-        cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd)
+        sc = rdd.ctx
+        jrdd = rdd._to_java_object_rdd()
+        cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
         return MultivariateStatisticalSummary(sc, cStats)
 
     @staticmethod
@@ -117,7 +134,7 @@ class Statistics(object):
         >>> from math import isnan
         >>> isnan(Statistics.corr(x, zeros))
         True
-        >>> from linalg import Vectors
+        >>> from pyspark.mllib.linalg import Vectors
         >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), 
Vectors.dense([4, 5, 0, 3]),
         ...                       Vectors.dense([6, 7, 0,  8]), 
Vectors.dense([9, 0, 0, 1])])
         >>> pearsonCorr = Statistics.corr(rdd)
@@ -144,18 +161,16 @@ class Statistics(object):
         # check if y is used to specify the method name instead.
         if type(y) == str:
             raise TypeError("Use 'method=' to specify method name.")
+
+        jx = x._to_java_object_rdd()
         if not y:
-            try:
-                Xser = _get_unmangled_double_vector_rdd(x)
-            except TypeError:
-                raise TypeError("corr called on a single RDD not consisted of 
Vectors.")
-            resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
-            return _deserialize_double_matrix(resultMat)
+            resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
+            bytes = sc._jvm.SerDe.dumps(resultMat)
+            ser = PickleSerializer()
+            return ser.loads(str(bytes)).toArray()
         else:
-            xSer = _get_unmangled_rdd(x, _serialize_double)
-            ySer = _get_unmangled_rdd(y, _serialize_double)
-            result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, 
method)
-            return result
+            jy = y._to_java_object_rdd()
+            return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8a851bd..f72e88b 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -20,6 +20,8 @@ Fuller unit tests for Python MLlib.
 """
 
 import sys
+import array as pyarray
+
 from numpy import array, array_equal
 
 if sys.version_info[:2] <= (2, 6):
@@ -27,9 +29,8 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
-from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
-    _deserialize_double_vector, _dot, _squared_distance
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer
+from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, 
_convert_to_vector
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.tests import PySparkTestCase
 
@@ -42,39 +43,52 @@ except:
     # No SciPy, but that's okay, we'll skip those tests
     pass
 
+ser = PickleSerializer()
+
+
+def _squared_distance(a, b):
+    if isinstance(a, Vector):
+        return a.squared_distance(b)
+    else:
+        return b.squared_distance(a)
 
-class VectorTests(unittest.TestCase):
+
+class VectorTests(PySparkTestCase):
+
+    def _test_serialize(self, v):
+        jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
+        nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
+        self.assertEqual(v, nv)
+        vs = [v] * 100
+        jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
+        nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
+        self.assertEqual(vs, nvs)
 
     def test_serialize(self):
-        sv = SparseVector(4, {1: 1, 3: 2})
-        dv = array([1., 2., 3., 4.])
-        lst = [1, 2, 3, 4]
-        self.assertTrue(sv is _convert_vector(sv))
-        self.assertTrue(dv is _convert_vector(dv))
-        self.assertTrue(array_equal(dv, _convert_vector(lst)))
-        self.assertEquals(sv, 
_deserialize_double_vector(_serialize_double_vector(sv)))
-        self.assertTrue(array_equal(dv, 
_deserialize_double_vector(_serialize_double_vector(dv))))
-        self.assertTrue(array_equal(dv, 
_deserialize_double_vector(_serialize_double_vector(lst))))
+        self._test_serialize(DenseVector(range(10)))
+        self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
+        self._test_serialize(DenseVector(pyarray.array('d', range(10))))
+        self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
 
     def test_dot(self):
         sv = SparseVector(4, {1: 1, 3: 2})
-        dv = array([1., 2., 3., 4.])
-        lst = [1, 2, 3, 4]
+        dv = DenseVector(array([1., 2., 3., 4.]))
+        lst = DenseVector([1, 2, 3, 4])
         mat = array([[1., 2., 3., 4.],
                      [1., 2., 3., 4.],
                      [1., 2., 3., 4.],
                      [1., 2., 3., 4.]])
-        self.assertEquals(10.0, _dot(sv, dv))
-        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
-        self.assertEquals(30.0, _dot(dv, dv))
-        self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, 
mat)))
-        self.assertEquals(30.0, _dot(lst, dv))
-        self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, 
mat)))
+        self.assertEquals(10.0, sv.dot(dv))
+        self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
+        self.assertEquals(30.0, dv.dot(dv))
+        self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
+        self.assertEquals(30.0, lst.dot(dv))
+        self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
 
     def test_squared_distance(self):
         sv = SparseVector(4, {1: 1, 3: 2})
-        dv = array([1., 2., 3., 4.])
-        lst = [4, 3, 2, 1]
+        dv = DenseVector(array([1., 2., 3., 4.]))
+        lst = DenseVector([4, 3, 2, 1])
         self.assertEquals(15.0, _squared_distance(sv, dv))
         self.assertEquals(25.0, _squared_distance(sv, lst))
         self.assertEquals(20.0, _squared_distance(dv, lst))
@@ -198,41 +212,36 @@ class SciPyTests(PySparkTestCase):
         lil[1, 0] = 1
         lil[3, 0] = 2
         sv = SparseVector(4, {1: 1, 3: 2})
-        self.assertEquals(sv, _convert_vector(lil))
-        self.assertEquals(sv, _convert_vector(lil.tocsc()))
-        self.assertEquals(sv, _convert_vector(lil.tocoo()))
-        self.assertEquals(sv, _convert_vector(lil.tocsr()))
-        self.assertEquals(sv, _convert_vector(lil.todok()))
-        self.assertEquals(sv, 
_deserialize_double_vector(_serialize_double_vector(lil)))
-        self.assertEquals(sv, 
_deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
-        self.assertEquals(sv, 
_deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
-        self.assertEquals(sv, 
_deserialize_double_vector(_serialize_double_vector(lil.todok())))
+        self.assertEquals(sv, _convert_to_vector(lil))
+        self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
+        self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
+        self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
+        self.assertEquals(sv, _convert_to_vector(lil.todok()))
+
+        def serialize(l):
+            return ser.loads(ser.dumps(_convert_to_vector(l)))
+        self.assertEquals(sv, serialize(lil))
+        self.assertEquals(sv, serialize(lil.tocsc()))
+        self.assertEquals(sv, serialize(lil.tocsr()))
+        self.assertEquals(sv, serialize(lil.todok()))
 
     def test_dot(self):
         from scipy.sparse import lil_matrix
         lil = lil_matrix((4, 1))
         lil[1, 0] = 1
         lil[3, 0] = 2
-        dv = array([1., 2., 3., 4.])
-        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
-        mat = array([[1., 2., 3., 4.],
-                     [1., 2., 3., 4.],
-                     [1., 2., 3., 4.],
-                     [1., 2., 3., 4.]])
-        self.assertEquals(10.0, _dot(lil, dv))
-        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+        dv = DenseVector(array([1., 2., 3., 4.]))
+        self.assertEquals(10.0, dv.dot(lil))
 
     def test_squared_distance(self):
         from scipy.sparse import lil_matrix
         lil = lil_matrix((4, 1))
         lil[1, 0] = 3
         lil[3, 0] = 2
-        dv = array([1., 2., 3., 4.])
+        dv = DenseVector(array([1., 2., 3., 4.]))
         sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
-        self.assertEquals(15.0, _squared_distance(lil, dv))
-        self.assertEquals(15.0, _squared_distance(lil, sv))
-        self.assertEquals(15.0, _squared_distance(dv, lil))
-        self.assertEquals(15.0, _squared_distance(sv, lil))
+        self.assertEquals(15.0, dv.squared_distance(lil))
+        self.assertEquals(15.0, sv.squared_distance(lil))
 
     def scipy_matrix(self, size, values):
         """Create a column SciPy matrix from a dictionary of values"""

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5b13ab6..f59a818 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -18,13 +18,9 @@
 from py4j.java_collections import MapConverter
 
 from pyspark import SparkContext, RDD
-from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, 
_serialize_double_vector, \
-    _deserialize_labeled_point, _get_unmangled_labeled_point_rdd, \
-    _deserialize_double
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vector, _convert_to_vector
 from pyspark.mllib.regression import LabeledPoint
-from pyspark.serializers import NoOpSerializer
-
 
 __all__ = ['DecisionTreeModel', 'DecisionTree']
 
@@ -55,21 +51,24 @@ class DecisionTreeModel(object):
         :param x:  Data point (feature vector),
                    or an RDD of data points (feature vectors).
         """
-        pythonAPI = self._sc._jvm.PythonMLLibAPI()
+        SerDe = self._sc._jvm.SerDe
+        ser = PickleSerializer()
         if isinstance(x, RDD):
             # Bulk prediction
-            if x.count() == 0:
+            first = x.take(1)
+            if not first:
                 return self._sc.parallelize([])
-            dataBytes = _get_unmangled_double_vector_rdd(x, cache=False)
-            jSerializedPreds = \
-                pythonAPI.predictDecisionTreeModel(self._java_model,
-                                                   dataBytes._jrdd)
-            serializedPreds = RDD(jSerializedPreds, self._sc, NoOpSerializer())
-            return serializedPreds.map(lambda bytes: 
_deserialize_double(bytearray(bytes)))
+            if not isinstance(first[0], Vector):
+                x = x.map(_convert_to_vector)
+            jPred = 
self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
+            jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+            return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
+
         else:
             # Assume x is a single data point.
-            x_ = _serialize_double_vector(x)
-            return pythonAPI.predictDecisionTreeModel(self._java_model, x_)
+            bytes = bytearray(ser.dumps(_convert_to_vector(x)))
+            vec = self._sc._jvm.SerDe.loads(bytes)
+            return self._java_model.predict(vec)
 
     def numNodes(self):
         return self._java_model.numNodes()
@@ -77,7 +76,7 @@ class DecisionTreeModel(object):
     def depth(self):
         return self._java_model.depth()
 
-    def __str__(self):
+    def __repr__(self):
         return self._java_model.toString()
 
 
@@ -90,53 +89,24 @@ class DecisionTree(object):
     EXPERIMENTAL: This is an experimental API.
                   It will probably be modified for Spark v1.2.
 
-    Example usage:
-
-    >>> from numpy import array
-    >>> import sys
-    >>> from pyspark.mllib.regression import LabeledPoint
-    >>> from pyspark.mllib.tree import DecisionTree
-    >>> from pyspark.mllib.linalg import SparseVector
-    >>>
-    >>> data = [
-    ...     LabeledPoint(0.0, [0.0]),
-    ...     LabeledPoint(1.0, [1.0]),
-    ...     LabeledPoint(1.0, [2.0]),
-    ...     LabeledPoint(1.0, [3.0])
-    ... ]
-    >>> categoricalFeaturesInfo = {} # no categorical features
-    >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 
numClasses=2,
-    ...                                      
categoricalFeaturesInfo=categoricalFeaturesInfo)
-    >>> sys.stdout.write(model)
-    DecisionTreeModel classifier
-      If (feature 0 <= 0.5)
-       Predict: 0.0
-      Else (feature 0 > 0.5)
-       Predict: 1.0
-    >>> model.predict(array([1.0])) > 0
-    True
-    >>> model.predict(array([0.0])) == 0
-    True
-    >>> sparse_data = [
-    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
-    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
-    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
-    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
-    ... ]
-    >>>
-    >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data),
-    ...                                     
categoricalFeaturesInfo=categoricalFeaturesInfo)
-    >>> model.predict(array([0.0, 1.0])) == 1
-    True
-    >>> model.predict(array([0.0, 0.0])) == 0
-    True
-    >>> model.predict(SparseVector(2, {1: 1.0})) == 1
-    True
-    >>> model.predict(SparseVector(2, {1: 0.0})) == 0
-    True
     """
 
     @staticmethod
+    def _train(data, type, numClasses, categoricalFeaturesInfo,
+               impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
+               minInfoGain=0.0):
+        first = data.first()
+        assert isinstance(first, LabeledPoint), "the data should be RDD of 
LabeledPoint"
+        sc = data.context
+        jrdd = data._to_java_object_rdd()
+        cfiMap = MapConverter().convert(categoricalFeaturesInfo,
+                                        sc._gateway._gateway_client)
+        model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
+            jrdd, type, numClasses, cfiMap,
+            impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
+        return DecisionTreeModel(sc, model)
+
+    @staticmethod
     def trainClassifier(data, numClasses, categoricalFeaturesInfo,
                         impurity="gini", maxDepth=5, maxBins=32, 
minInstancesPerNode=1,
                         minInfoGain=0.0):
@@ -159,18 +129,34 @@ class DecisionTree(object):
                                     the parent split
         :param minInfoGain: Min info gain required to create a split
         :return: DecisionTreeModel
+
+        Example usage:
+
+        >>> from numpy import array
+        >>> from pyspark.mllib.regression import LabeledPoint
+        >>> from pyspark.mllib.tree import DecisionTree
+        >>> from pyspark.mllib.linalg import SparseVector
+        >>>
+        >>> data = [
+        ...     LabeledPoint(0.0, [0.0]),
+        ...     LabeledPoint(1.0, [1.0]),
+        ...     LabeledPoint(1.0, [2.0]),
+        ...     LabeledPoint(1.0, [3.0])
+        ... ]
+        >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
+        >>> print model,  # it already has newline
+        DecisionTreeModel classifier
+          If (feature 0 <= 0.5)
+           Predict: 0.0
+          Else (feature 0 > 0.5)
+           Predict: 1.0
+        >>> model.predict(array([1.0])) > 0
+        True
+        >>> model.predict(array([0.0])) == 0
+        True
         """
-        sc = data.context
-        dataBytes = _get_unmangled_labeled_point_rdd(data)
-        categoricalFeaturesInfoJMap = \
-            MapConverter().convert(categoricalFeaturesInfo,
-                                   sc._gateway._gateway_client)
-        model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
-            dataBytes._jrdd, "classification",
-            numClasses, categoricalFeaturesInfoJMap,
-            impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
-        dataBytes.unpersist()
-        return DecisionTreeModel(sc, model)
+        return DecisionTree._train(data, "classification", numClasses, 
categoricalFeaturesInfo,
+                                   impurity, maxDepth, maxBins, 
minInstancesPerNode, minInfoGain)
 
     @staticmethod
     def trainRegressor(data, categoricalFeaturesInfo,
@@ -194,18 +180,33 @@ class DecisionTree(object):
                                     the parent split
         :param minInfoGain: Min info gain required to create a split
         :return: DecisionTreeModel
+
+        Example usage:
+
+        >>> from numpy import array
+        >>> from pyspark.mllib.regression import LabeledPoint
+        >>> from pyspark.mllib.tree import DecisionTree
+        >>> from pyspark.mllib.linalg import SparseVector
+        >>>
+        >>> sparse_data = [
+        ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+        ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+        ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+        ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+        ... ]
+        >>>
+        >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), 
{})
+        >>> model.predict(array([0.0, 1.0])) == 1
+        True
+        >>> model.predict(array([0.0, 0.0])) == 0
+        True
+        >>> model.predict(SparseVector(2, {1: 1.0})) == 1
+        True
+        >>> model.predict(SparseVector(2, {1: 0.0})) == 0
+        True
         """
-        sc = data.context
-        dataBytes = _get_unmangled_labeled_point_rdd(data)
-        categoricalFeaturesInfoJMap = \
-            MapConverter().convert(categoricalFeaturesInfo,
-                                   sc._gateway._gateway_client)
-        model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
-            dataBytes._jrdd, "regression",
-            0, categoricalFeaturesInfoJMap,
-            impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
-        dataBytes.unpersist()
-        return DecisionTreeModel(sc, model)
+        return DecisionTree._train(data, "regression", 0, 
categoricalFeaturesInfo,
+                                   impurity, maxDepth, maxBins, 
minInstancesPerNode, minInfoGain)
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1c7b8c8..8233d4e 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -18,11 +18,10 @@
 import numpy as np
 import warnings
 
-from pyspark.mllib.linalg import Vectors, SparseVector
-from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
 from pyspark.rdd import RDD
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
 
 
 class MLUtils(object):
@@ -32,15 +31,12 @@ class MLUtils(object):
     """
 
     @staticmethod
-    def _parse_libsvm_line(line, multiclass):
-        warnings.warn("deprecated", DeprecationWarning)
-        return _parse_libsvm_line(line)
-
-    @staticmethod
-    def _parse_libsvm_line(line):
+    def _parse_libsvm_line(line, multiclass=None):
         """
         Parses a line in LIBSVM format into (label, indices, values).
         """
+        if multiclass is not None:
+            warnings.warn("deprecated", DeprecationWarning)
         items = line.split(None)
         label = float(items[0])
         nnz = len(items) - 1
@@ -55,27 +51,20 @@ class MLUtils(object):
     @staticmethod
     def _convert_labeled_point_to_libsvm(p):
         """Converts a LabeledPoint to a string in LIBSVM format."""
+        assert isinstance(p, LabeledPoint)
         items = [str(p.label)]
-        v = _convert_vector(p.features)
-        if type(v) == np.ndarray:
-            for i in xrange(len(v)):
-                items.append(str(i + 1) + ":" + str(v[i]))
-        elif type(v) == SparseVector:
+        v = _convert_to_vector(p.features)
+        if isinstance(v, SparseVector):
             nnz = len(v.indices)
             for i in xrange(nnz):
                 items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
         else:
-            raise TypeError("_convert_labeled_point_to_libsvm needs either 
ndarray or SparseVector"
-                            " but got " % type(v))
+            for i in xrange(len(v)):
+                items.append(str(i + 1) + ":" + str(v[i]))
         return " ".join(items)
 
     @staticmethod
-    def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, 
minPartitions=None):
-        warnings.warn("deprecated", DeprecationWarning)
-        return loadLibSVMFile(sc, path, numFeatures, minPartitions)
-
-    @staticmethod
-    def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None):
+    def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None, 
multiclass=None):
         """
         Loads labeled data in the LIBSVM format into an RDD of
         LabeledPoint. The LIBSVM format is a text-based format used by
@@ -122,6 +111,8 @@ class MLUtils(object):
         >>> print examples[2]
         (-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
         """
+        if multiclass is not None:
+            warnings.warn("deprecated", DeprecationWarning)
 
         lines = sc.textFile(path, minPartitions)
         parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
@@ -182,9 +173,9 @@ class MLUtils(object):
         (0.0,[1.01,2.02,3.03])
         """
         minPartitions = minPartitions or min(sc.defaultParallelism, 2)
-        jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, 
path, minPartitions)
-        serialized = RDD(jSerialized, sc, NoOpSerializer())
-        return serialized.map(lambda bytes: 
_deserialize_labeled_point(bytearray(bytes)))
+        jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, 
minPartitions)
+        jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
+        return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b43606b..8ef233b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -34,7 +34,7 @@ from math import sqrt, log, isinf, isnan
 
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
-    PickleSerializer, pack_long, CompressedSerializer
+    PickleSerializer, pack_long, AutoBatchedSerializer
 from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
@@ -1927,10 +1927,10 @@ class RDD(object):
         It will convert each Python object into Java object by Pyrolite, 
whenever the
         RDD is serialized in batch or not.
         """
-        if not self._is_pickled():
-            self = self._reserialize(BatchedSerializer(PickleSerializer(), 
1024))
-        batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
-        return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+        rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
+            if not self._is_pickled() else self
+        is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
+        return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
 
     def countApprox(self, timeout, confidence=0.95):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 44ac564..2672da3 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -68,6 +68,7 @@ import sys
 import types
 import collections
 import zlib
+import itertools
 
 from pyspark import cloudpickle
 
@@ -214,6 +215,41 @@ class BatchedSerializer(Serializer):
         return "BatchedSerializer<%s>" % str(self.serializer)
 
 
+class AutoBatchedSerializer(BatchedSerializer):
+    """
+    Choose the size of batch automatically based on the size of object
+    """
+
+    def __init__(self, serializer, bestSize=1 << 20):
+        BatchedSerializer.__init__(self, serializer, -1)
+        self.bestSize = bestSize
+
+    def dump_stream(self, iterator, stream):
+        batch, best = 1, self.bestSize
+        iterator = iter(iterator)
+        while True:
+            vs = list(itertools.islice(iterator, batch))
+            if not vs:
+                break
+
+            bytes = self.serializer.dumps(vs)
+            write_int(len(bytes), stream)
+            stream.write(bytes)
+
+            size = len(bytes)
+            if size < best:
+                batch *= 2
+            elif size > best * 10 and batch > 1:
+                batch /= 2
+
+    def __eq__(self, other):
+        return (isinstance(other, AutoBatchedSerializer) and
+                other.serializer == self.serializer)
+
+    def __str__(self):
+        return "BatchedSerializer<%s>" % str(self.serializer)
+
+
 class CartesianDeserializer(FramedSerializer):
 
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/run-tests
----------------------------------------------------------------------
diff --git a/python/run-tests b/python/run-tests
index a67e5a9..a7ec270 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -73,7 +73,6 @@ run_test "pyspark/serializers.py"
 unset PYSPARK_DOC_TEST
 run_test "pyspark/shuffle.py"
 run_test "pyspark/tests.py"
-run_test "pyspark/mllib/_common.py"
 run_test "pyspark/mllib/classification.py"
 run_test "pyspark/mllib/clustering.py"
 run_test "pyspark/mllib/linalg.py"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to