[SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib

Currently, we serialize the data between JVM and Python case by case manually, 
this cannot scale to support so many APIs in MLlib.

This patch will try to address this problem by serialize the data using pickle 
protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle 
protocol can be easily extended to support customized class.

All the modules are refactored to use this protocol.

Known issues: There will be some performance regression (both CPU and memory, 
the serialized data increased)

Author: Davies Liu <davies....@gmail.com>

Closes #2378 from davies/pickle_mllib and squashes the following commits:

dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
pickle_mllib
810f97f [Davies Liu] fix equal of matrix
032cd62 [Davies Liu] add more type check and conversion for user_product
bd738ab [Davies Liu] address comments
e431377 [Davies Liu] fix cache of rdd, refactor
19d0967 [Davies Liu] refactor Picklers
2511e76 [Davies Liu] cleanup
1fccf1a [Davies Liu] address comments
a2cc855 [Davies Liu] fix tests
9ceff73 [Davies Liu] test size of serialized Rating
44e0551 [Davies Liu] fix cache
a379a81 [Davies Liu] fix pickle array in python2.7
df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib
154d141 [Davies Liu] fix autobatchedpickler
44736d7 [Davies Liu] speed up pickling array in Python 2.7
e1d1bfc [Davies Liu] refactor
708dc02 [Davies Liu] fix tests
9dcfb63 [Davies Liu] fix style
88034f0 [Davies Liu] rafactor, address comments
46a501e [Davies Liu] choose batch size automatically
df19464 [Davies Liu] memorize the module and class name during pickleing
f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib
722dd96 [Davies Liu] cleanup _common.py
0ee1525 [Davies Liu] remove outdated tests
b02e34f [Davies Liu] remove _common.py
84c721d [Davies Liu] Merge branch 'master' into pickle_mllib
4d7963e [Davies Liu] remove muanlly serialization
6d26b03 [Davies Liu] fix tests
c383544 [Davies Liu] classification
f2a0856 [Davies Liu] mllib/regression
d9f691f [Davies Liu] mllib/util
cccb8b1 [Davies Liu] mllib/tree
8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib
aa2287e [Davies Liu] random
f1544c4 [Davies Liu] refactor clustering
52d1350 [Davies Liu] use new protocol in mllib/stat
b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation
f44f771 [Davies Liu] enable tests about array
3908f5c [Davies Liu] Merge branch 'master' into pickle
c77c87b [Davies Liu] cleanup debugging code
60e4e2f [Davies Liu] support unpickle array.array for Python 2.6


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fce5e251
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fce5e251
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fce5e251

Branch: refs/heads/master
Commit: fce5e251d636c788cda91345867e0294280c074d
Parents: a03e5b8
Author: Davies Liu <davies....@gmail.com>
Authored: Fri Sep 19 15:01:11 2014 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Sep 19 15:01:11 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala |  31 +-
 .../org/apache/spark/api/python/SerDeUtil.scala |   4 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 487 ++++++----------
 .../apache/spark/mllib/linalg/Matrices.scala    |  10 +-
 .../MatrixFactorizationModel.scala              |  15 -
 .../mllib/api/python/PythonMLLibAPISuite.scala  |  44 +-
 python/epydoc.conf                              |   2 +-
 python/pyspark/context.py                       |   1 +
 python/pyspark/mllib/_common.py                 | 562 -------------------
 python/pyspark/mllib/classification.py          |  61 +-
 python/pyspark/mllib/clustering.py              |  38 +-
 python/pyspark/mllib/linalg.py                  | 256 +++++++--
 python/pyspark/mllib/random.py                  |  54 +-
 python/pyspark/mllib/recommendation.py          |  69 ++-
 python/pyspark/mllib/regression.py              | 105 ++--
 python/pyspark/mllib/stat.py                    |  63 ++-
 python/pyspark/mllib/tests.py                   |  99 ++--
 python/pyspark/mllib/tree.py                    | 167 +++---
 python/pyspark/mllib/util.py                    |  43 +-
 python/pyspark/rdd.py                           |  10 +-
 python/pyspark/serializers.py                   |  36 ++
 python/run-tests                                |   1 -
 22 files changed, 891 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 12b345a..f9ff4ea 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
     }.toJavaRDD()
   }
 
+  private class AutoBatchedPickler(iter: Iterator[Any]) extends 
Iterator[Array[Byte]] {
+    private val pickle = new Pickler()
+    private var batch = 1
+    private val buffer = new mutable.ArrayBuffer[Any]
+
+    override def hasNext(): Boolean = iter.hasNext
+
+    override def next(): Array[Byte] = {
+      while (iter.hasNext && buffer.length < batch) {
+        buffer += iter.next()
+      }
+      val bytes = pickle.dumps(buffer.toArray)
+      val size = bytes.length
+      // let  1M < size < 10M
+      if (size < 1024 * 1024) {
+        batch *= 2
+      } else if (size > 1024 * 1024 * 10 && batch > 1) {
+        batch /= 2
+      }
+      buffer.clear()
+      bytes
+    }
+  }
+
   /**
    * Convert an RDD of Java objects to an RDD of serialized Python objects, 
that is usable by
    * PySpark.
    */
   def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
-    jRDD.rdd.mapPartitions { iter =>
-      val pickle = new Pickler
-      iter.map { row =>
-        pickle.dumps(row)
-      }
-    }
+    jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 6668797..7903457 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
         construct(args ++ Array(""))
       } else if (args.length == 2 && args(1).isInstanceOf[String]) {
         val typecode = args(0).asInstanceOf[String].charAt(0)
-        val data: String = args(1).asInstanceOf[String]
-        construct(typecode, machineCodes(typecode), 
data.getBytes("ISO-8859-1"))
+        val data: Array[Byte] = 
args(1).asInstanceOf[String].getBytes("ISO-8859-1")
+        construct(typecode, machineCodes(typecode), data)
       } else {
         super.construct(args)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index fa0fa69..9164c29 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.mllib.api.python
 
-import java.nio.{ByteBuffer, ByteOrder}
+import java.io.OutputStream
 
 import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import net.razorvine.pickle._
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
 import org.apache.spark.mllib.optimization._
-import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.random.{RandomRDDs => RG}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
@@ -40,11 +44,10 @@ import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
+
 /**
  * :: DeveloperApi ::
  * The Java stubs necessary for the Python mllib bindings.
- *
- * See python/pyspark/mllib/_common.py for the mutually agreed upon data 
format.
  */
 @DeveloperApi
 class PythonMLLibAPI extends Serializable {
@@ -60,18 +63,17 @@ class PythonMLLibAPI extends Serializable {
   def loadLabeledPoints(
       jsc: JavaSparkContext,
       path: String,
-      minPartitions: Int): JavaRDD[Array[Byte]] =
-    MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(SerDe.serializeLabeledPoint)
+      minPartitions: Int): JavaRDD[LabeledPoint] =
+    MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
 
   private def trainRegressionModel(
       trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = 
{
-    val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
-    val initialWeights = SerDe.deserializeDoubleVector(initialWeightsBA)
-    val model = trainFunc(data, initialWeights)
+    val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector]
+    val model = trainFunc(data.rdd, initialWeights)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(SerDe.serializeDoubleVector(model.weights))
+    ret.add(SerDe.dumps(model.weights))
     ret.add(model.intercept: java.lang.Double)
     ret
   }
@@ -80,7 +82,7 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib LinearRegressionWithSGD.train()
    */
   def trainLinearRegressionModelWithSGD(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
@@ -106,7 +108,7 @@ class PythonMLLibAPI extends Serializable {
     trainRegressionModel(
       (data, initialWeights) =>
         lrAlg.run(data, initialWeights),
-      dataBytesJRDD,
+      data,
       initialWeightsBA)
   }
 
@@ -114,7 +116,7 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib LassoWithSGD.train()
    */
   def trainLassoModelWithSGD(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
@@ -129,7 +131,7 @@ class PythonMLLibAPI extends Serializable {
           regParam,
           miniBatchFraction,
           initialWeights),
-      dataBytesJRDD,
+      data,
       initialWeightsBA)
   }
 
@@ -137,7 +139,7 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib RidgeRegressionWithSGD.train()
    */
   def trainRidgeModelWithSGD(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
@@ -152,7 +154,7 @@ class PythonMLLibAPI extends Serializable {
           regParam,
           miniBatchFraction,
           initialWeights),
-      dataBytesJRDD,
+      data,
       initialWeightsBA)
   }
 
@@ -160,7 +162,7 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib SVMWithSGD.train()
    */
   def trainSVMModelWithSGD(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
@@ -186,7 +188,7 @@ class PythonMLLibAPI extends Serializable {
     trainRegressionModel(
       (data, initialWeights) =>
         SVMAlg.run(data, initialWeights),
-      dataBytesJRDD,
+      data,
       initialWeightsBA)
   }
 
@@ -194,7 +196,7 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib LogisticRegressionWithSGD.train()
    */
   def trainLogisticRegressionModelWithSGD(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
@@ -220,7 +222,7 @@ class PythonMLLibAPI extends Serializable {
     trainRegressionModel(
       (data, initialWeights) =>
         LogRegAlg.run(data, initialWeights),
-      dataBytesJRDD,
+      data,
       initialWeightsBA)
   }
 
@@ -228,14 +230,13 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for NaiveBayes.train()
    */
   def trainNaiveBayes(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       lambda: Double): java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
-    val model = NaiveBayes.train(data, lambda)
+    val model = NaiveBayes.train(data.rdd, lambda)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(SerDe.serializeDoubleVector(Vectors.dense(model.labels)))
-    ret.add(SerDe.serializeDoubleVector(Vectors.dense(model.pi)))
-    ret.add(SerDe.serializeDoubleMatrix(model.theta))
+    ret.add(Vectors.dense(model.labels))
+    ret.add(Vectors.dense(model.pi))
+    ret.add(model.theta)
     ret
   }
 
@@ -243,16 +244,12 @@ class PythonMLLibAPI extends Serializable {
    * Java stub for Python mllib KMeans.train()
    */
   def trainKMeansModel(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[Vector],
       k: Int,
       maxIterations: Int,
       runs: Int,
-      initializationMode: String): java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(bytes => 
SerDe.deserializeDoubleVector(bytes))
-    val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
-    val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(SerDe.serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))
-    ret
+      initializationMode: String): KMeansModel = {
+    KMeans.train(data.rdd, k, maxIterations, runs, initializationMode)
   }
 
   /**
@@ -262,13 +259,12 @@ class PythonMLLibAPI extends Serializable {
    * the Py4J documentation.
    */
   def trainALSModel(
-      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      ratings: JavaRDD[Rating],
       rank: Int,
       iterations: Int,
       lambda: Double,
       blocks: Int): MatrixFactorizationModel = {
-    val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
-    ALS.train(ratings, rank, iterations, lambda, blocks)
+    ALS.train(ratings.rdd, rank, iterations, lambda, blocks)
   }
 
   /**
@@ -278,14 +274,13 @@ class PythonMLLibAPI extends Serializable {
    * exit; see the Py4J documentation.
    */
   def trainImplicitALSModel(
-      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      ratingsJRDD: JavaRDD[Rating],
       rank: Int,
       iterations: Int,
       lambda: Double,
       blocks: Int,
       alpha: Double): MatrixFactorizationModel = {
-    val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
-    ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+    ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)
   }
 
   /**
@@ -293,11 +288,11 @@ class PythonMLLibAPI extends Serializable {
    * This stub returns a handle to the Java object instead of the content of 
the Java object.
    * Extra care needs to be taken in the Python code to ensure it gets freed 
on exit;
    * see the Py4J documentation.
-   * @param dataBytesJRDD  Training data
+   * @param data  Training data
    * @param categoricalFeaturesInfoJMap  Categorical features info, as Java map
    */
   def trainDecisionTreeModel(
-      dataBytesJRDD: JavaRDD[Array[Byte]],
+      data: JavaRDD[LabeledPoint],
       algoStr: String,
       numClasses: Int,
       categoricalFeaturesInfoJMap: java.util.Map[Int, Int],
@@ -307,8 +302,6 @@ class PythonMLLibAPI extends Serializable {
       minInstancesPerNode: Int,
       minInfoGain: Double): DecisionTreeModel = {
 
-    val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
-
     val algo = Algo.fromString(algoStr)
     val impurity = Impurities.fromString(impurityStr)
 
@@ -322,44 +315,15 @@ class PythonMLLibAPI extends Serializable {
       minInstancesPerNode = minInstancesPerNode,
       minInfoGain = minInfoGain)
 
-    DecisionTree.train(data, strategy)
-  }
-
-  /**
-   * Predict the label of the given data point.
-   * This is a Java stub for python DecisionTreeModel.predict()
-   *
-   * @param featuresBytes Serialized feature vector for data point
-   * @return predicted label
-   */
-  def predictDecisionTreeModel(
-      model: DecisionTreeModel,
-      featuresBytes: Array[Byte]): Double = {
-    val features: Vector = SerDe.deserializeDoubleVector(featuresBytes)
-    model.predict(features)
-  }
-
-  /**
-   * Predict the labels of the given data points.
-   * This is a Java stub for python DecisionTreeModel.predict()
-   *
-   * @param dataJRDD A JavaRDD with serialized feature vectors
-   * @return JavaRDD of serialized predictions
-   */
-  def predictDecisionTreeModel(
-      model: DecisionTreeModel,
-      dataJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
-    val data = dataJRDD.rdd.map(xBytes => 
SerDe.deserializeDoubleVector(xBytes))
-    model.predict(data).map(SerDe.serializeDouble)
+    DecisionTree.train(data.rdd, strategy)
   }
 
   /**
    * Java stub for mllib Statistics.colStats(X: RDD[Vector]).
    * TODO figure out return type.
    */
-  def colStats(X: JavaRDD[Array[Byte]]): 
MultivariateStatisticalSummarySerialized = {
-    val cStats = 
Statistics.colStats(X.rdd.map(SerDe.deserializeDoubleVector(_)))
-    new MultivariateStatisticalSummarySerialized(cStats)
+  def colStats(rdd: JavaRDD[Vector]): MultivariateStatisticalSummary = {
+    Statistics.colStats(rdd.rdd)
   }
 
   /**
@@ -367,19 +331,15 @@ class PythonMLLibAPI extends Serializable {
    * Returns the correlation matrix serialized into a byte array understood by 
deserializers in
    * pyspark.
    */
-  def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
-    val inputMatrix = X.rdd.map(SerDe.deserializeDoubleVector(_))
-    val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
-    SerDe.serializeDoubleMatrix(SerDe.to2dArray(result))
+  def corr(x: JavaRDD[Vector], method: String): Matrix = {
+    Statistics.corr(x.rdd, getCorrNameOrDefault(method))
   }
 
   /**
    * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], 
method: String).
    */
-  def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): 
Double = {
-    val xDeser = x.rdd.map(SerDe.deserializeDouble(_))
-    val yDeser = y.rdd.map(SerDe.deserializeDouble(_))
-    Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+  def corr(x: JavaRDD[Double], y: JavaRDD[Double], method: String): Double = {
+    Statistics.corr(x.rdd, y.rdd, getCorrNameOrDefault(method))
   }
 
   // used by the corr methods to retrieve the name of the correlation method 
passed in via pyspark
@@ -411,10 +371,10 @@ class PythonMLLibAPI extends Serializable {
   def uniformRDD(jsc: JavaSparkContext,
       size: Long,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Double] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.uniformRDD(jsc.sc, size, parts, s).map(SerDe.serializeDouble)
+    RG.uniformRDD(jsc.sc, size, parts, s)
   }
 
   /**
@@ -423,10 +383,10 @@ class PythonMLLibAPI extends Serializable {
   def normalRDD(jsc: JavaSparkContext,
       size: Long,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Double] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.normalRDD(jsc.sc, size, parts, s).map(SerDe.serializeDouble)
+    RG.normalRDD(jsc.sc, size, parts, s)
   }
 
   /**
@@ -436,10 +396,10 @@ class PythonMLLibAPI extends Serializable {
       mean: Double,
       size: Long,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Double] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.poissonRDD(jsc.sc, mean, size, parts, s).map(SerDe.serializeDouble)
+    RG.poissonRDD(jsc.sc, mean, size, parts, s)
   }
 
   /**
@@ -449,10 +409,10 @@ class PythonMLLibAPI extends Serializable {
       numRows: Long,
       numCols: Int,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Vector] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, 
s).map(SerDe.serializeDoubleVector)
+    RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s)
   }
 
   /**
@@ -462,10 +422,10 @@ class PythonMLLibAPI extends Serializable {
       numRows: Long,
       numCols: Int,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Vector] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, 
s).map(SerDe.serializeDoubleVector)
+    RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s)
   }
 
   /**
@@ -476,259 +436,168 @@ class PythonMLLibAPI extends Serializable {
       numRows: Long,
       numCols: Int,
       numPartitions: java.lang.Integer,
-      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+      seed: java.lang.Long): JavaRDD[Vector] = {
     val parts = getNumPartitionsOrDefault(numPartitions, jsc)
     val s = getSeedOrDefault(seed)
-    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, 
s).map(SerDe.serializeDoubleVector)
+    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
   }
 
 }
 
 /**
- * :: DeveloperApi ::
- * MultivariateStatisticalSummary with Vector fields serialized.
+ * SerDe utility functions for PythonMLLibAPI.
  */
-@DeveloperApi
-class MultivariateStatisticalSummarySerialized(val summary: 
MultivariateStatisticalSummary)
-  extends Serializable {
+private[spark] object SerDe extends Serializable {
 
-  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
+  val PYSPARK_PACKAGE = "pyspark.mllib"
 
-  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
+  /**
+   * Base class used for pickle
+   */
+  private[python] abstract class BasePickler[T: ClassTag]
+    extends IObjectPickler with IObjectConstructor {
+
+    private val cls = implicitly[ClassTag[T]].runtimeClass
+    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
+    private val name = cls.getSimpleName
+
+    // register this to Pickler and Unpickler
+    def register(): Unit = {
+      Pickler.registerCustomPickler(this.getClass, this)
+      Pickler.registerCustomPickler(cls, this)
+      Unpickler.registerConstructor(module, name, this)
+    }
 
-  def count: Long = summary.count
+    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+      if (obj == this) {
+        out.write(Opcodes.GLOBAL)
+        out.write((module + "\n" + name + "\n").getBytes())
+      } else {
+        pickler.save(this)  // it will be memorized by Pickler
+        saveState(obj, out, pickler)
+        out.write(Opcodes.REDUCE)
+      }
+    }
+
+    private[python] def saveObjects(out: OutputStream, pickler: Pickler, 
objects: Any*) = {
+      if (objects.length == 0 || objects.length > 3) {
+        out.write(Opcodes.MARK)
+      }
+      objects.foreach(pickler.save(_))
+      val code = objects.length match {
+        case 1 => Opcodes.TUPLE1
+        case 2 => Opcodes.TUPLE2
+        case 3 => Opcodes.TUPLE3
+        case _ => Opcodes.TUPLE
+      }
+      out.write(code)
+    }
 
-  def numNonzeros: Array[Byte] = 
SerDe.serializeDoubleVector(summary.numNonzeros)
+    private[python] def saveState(obj: Object, out: OutputStream, pickler: 
Pickler)
+  }
 
-  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
+  // Pickler for DenseVector
+  private[python] class DenseVectorPickler extends BasePickler[DenseVector] {
 
-  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
-}
+    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+      val vector: DenseVector = obj.asInstanceOf[DenseVector]
+      saveObjects(out, pickler, vector.toArray)
+    }
 
-/**
- * SerDe utility functions for PythonMLLibAPI.
- */
-private[spark] object SerDe extends Serializable {
-  private val DENSE_VECTOR_MAGIC: Byte = 1
-  private val SPARSE_VECTOR_MAGIC: Byte = 2
-  private val DENSE_MATRIX_MAGIC: Byte = 3
-  private val LABELED_POINT_MAGIC: Byte = 4
-
-  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int 
= 0): Vector = {
-    require(bytes.length - offset >= 5, "Byte array too short")
-    val magic = bytes(offset)
-    if (magic == DENSE_VECTOR_MAGIC) {
-      deserializeDenseVector(bytes, offset)
-    } else if (magic == SPARSE_VECTOR_MAGIC) {
-      deserializeSparseVector(bytes, offset)
-    } else {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    def construct(args: Array[Object]): Object = {
+      require(args.length == 1)
+      if (args.length != 1) {
+        throw new PickleException("should be 1")
+      }
+      new DenseVector(args(0).asInstanceOf[Array[Double]])
     }
   }
 
-  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): 
Double = {
-    require(bytes.length - offset == 8, "Wrong size byte array for Double")
-    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
-    bb.order(ByteOrder.nativeOrder())
-    bb.getDouble
-  }
-
-  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 
0): Vector = {
-    val packetLength = bytes.length - offset
-    require(packetLength >= 5, "Byte array too short")
-    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.get()
-    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
-    val length = bb.getInt()
-    require (packetLength == 5 + 8 * length, "Invalid packet length: " + 
packetLength)
-    val db = bb.asDoubleBuffer()
-    val ans = new Array[Double](length.toInt)
-    db.get(ans)
-    Vectors.dense(ans)
-  }
-
-  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int 
= 0): Vector = {
-    val packetLength = bytes.length - offset
-    require(packetLength >= 9, "Byte array too short")
-    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.get()
-    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
-    val size = bb.getInt()
-    val nonZeros = bb.getInt()
-    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + 
packetLength)
-    val ib = bb.asIntBuffer()
-    val indices = new Array[Int](nonZeros)
-    ib.get(indices)
-    bb.position(bb.position() + 4 * nonZeros)
-    val db = bb.asDoubleBuffer()
-    val values = new Array[Double](nonZeros)
-    db.get(values)
-    Vectors.sparse(size, indices, values)
-  }
+  // Pickler for DenseMatrix
+  private[python] class DenseMatrixPickler extends BasePickler[DenseMatrix] {
 
-  /**
-   * Returns an 8-byte array for the input Double.
-   *
-   * Note: we currently do not use a magic byte for double for storage 
efficiency.
-   * This should be reconsidered when we add Ser/De for other 8-byte types 
(e.g. Long), for safety.
-   * The corresponding deserializer, deserializeDouble, needs to be modified 
as well if the
-   * serialization scheme changes.
-   */
-  private[python] def serializeDouble(double: Double): Array[Byte] = {
-    val bytes = new Array[Byte](8)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.putDouble(double)
-    bytes
-  }
-
-  private[python] def serializeDenseVector(doubles: Array[Double]): 
Array[Byte] = {
-    val len = doubles.length
-    val bytes = new Array[Byte](5 + 8 * len)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.put(DENSE_VECTOR_MAGIC)
-    bb.putInt(len)
-    val db = bb.asDoubleBuffer()
-    db.put(doubles)
-    bytes
-  }
-
-  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] 
= {
-    val nonZeros = vector.indices.length
-    val bytes = new Array[Byte](9 + 12 * nonZeros)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.put(SPARSE_VECTOR_MAGIC)
-    bb.putInt(vector.size)
-    bb.putInt(nonZeros)
-    val ib = bb.asIntBuffer()
-    ib.put(vector.indices)
-    bb.position(bb.position() + 4 * nonZeros)
-    val db = bb.asDoubleBuffer()
-    db.put(vector.values)
-    bytes
-  }
-
-  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = 
vector match {
-    case s: SparseVector =>
-      serializeSparseVector(s)
-    case _ =>
-      serializeDenseVector(vector.toArray)
-  }
-
-  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): 
Array[Array[Double]] = {
-    val packetLength = bytes.length
-    if (packetLength < 9) {
-      throw new IllegalArgumentException("Byte array too short.")
+    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
+      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
     }
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.get()
-    if (magic != DENSE_MATRIX_MAGIC) {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+
+    def construct(args: Array[Object]): Object = {
+      if (args.length != 3) {
+        throw new PickleException("should be 3")
+      }
+      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
+        args(2).asInstanceOf[Array[Double]])
     }
-    val rows = bb.getInt()
-    val cols = bb.getInt()
-    if (packetLength != 9 + 8 * rows * cols) {
-      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is 
wrong.")
+  }
+
+  // Pickler for SparseVector
+  private[python] class SparseVectorPickler extends BasePickler[SparseVector] {
+
+    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+      val v: SparseVector = obj.asInstanceOf[SparseVector]
+      saveObjects(out, pickler, v.size, v.indices, v.values)
     }
-    val db = bb.asDoubleBuffer()
-    val ans = new Array[Array[Double]](rows.toInt)
-    for (i <- 0 until rows.toInt) {
-      ans(i) = new Array[Double](cols.toInt)
-      db.get(ans(i))
+
+    def construct(args: Array[Object]): Object = {
+      if (args.length != 3) {
+        throw new PickleException("should be 3")
+      }
+      new SparseVector(args(0).asInstanceOf[Int], 
args(1).asInstanceOf[Array[Int]],
+        args(2).asInstanceOf[Array[Double]])
     }
-    ans
   }
 
-  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): 
Array[Byte] = {
-    val rows = doubles.length
-    var cols = 0
-    if (rows > 0) {
-      cols = doubles(0).length
+  // Pickler for LabeledPoint
+  private[python] class LabeledPointPickler extends BasePickler[LabeledPoint] {
+
+    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
+      saveObjects(out, pickler, point.label, point.features)
     }
-    val bytes = new Array[Byte](9 + 8 * rows * cols)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.put(DENSE_MATRIX_MAGIC)
-    bb.putInt(rows)
-    bb.putInt(cols)
-    val db = bb.asDoubleBuffer()
-    for (i <- 0 until rows) {
-      db.put(doubles(i))
+
+    def construct(args: Array[Object]): Object = {
+      if (args.length != 2) {
+        throw new PickleException("should be 2")
+      }
+      new LabeledPoint(args(0).asInstanceOf[Double], 
args(1).asInstanceOf[Vector])
     }
-    bytes
   }
 
-  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
-    val fb = serializeDoubleVector(p.features)
-    val bytes = new Array[Byte](1 + 8 + fb.length)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.put(LABELED_POINT_MAGIC)
-    bb.putDouble(p.label)
-    bb.put(fb)
-    bytes
-  }
+  // Pickler for Rating
+  private[python] class RatingPickler extends BasePickler[Rating] {
 
-  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): 
LabeledPoint = {
-    require(bytes.length >= 9, "Byte array too short")
-    val magic = bytes(0)
-    if (magic != LABELED_POINT_MAGIC) {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+      val rating: Rating = obj.asInstanceOf[Rating]
+      saveObjects(out, pickler, rating.user, rating.product, rating.rating)
     }
-    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
-    labelBytes.order(ByteOrder.nativeOrder())
-    val label = labelBytes.asDoubleBuffer().get(0)
-    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
-  }
 
-  // Reformat a Matrix into Array[Array[Double]] for serialization
-  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
-    val values = matrix.toArray
-    Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * 
matrix.numRows))
+    def construct(args: Array[Object]): Object = {
+      if (args.length != 3) {
+        throw new PickleException("should be 3")
+      }
+      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
+        args(2).asInstanceOf[Double])
+    }
   }
 
+  def initialize(): Unit = {
+    new DenseVectorPickler().register()
+    new DenseMatrixPickler().register()
+    new SparseVectorPickler().register()
+    new LabeledPointPickler().register()
+    new RatingPickler().register()
+  }
 
-  /** Unpack a Rating object from an array of bytes */
-  private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
-    val bb = ByteBuffer.wrap(ratingBytes)
-    bb.order(ByteOrder.nativeOrder())
-    val user = bb.getInt()
-    val product = bb.getInt()
-    val rating = bb.getDouble()
-    new Rating(user, product, rating)
+  def dumps(obj: AnyRef): Array[Byte] = {
+    new Pickler().dumps(obj)
   }
 
-  /** Unpack a tuple of Ints from an array of bytes */
-  def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
-    val bb = ByteBuffer.wrap(tupleBytes)
-    bb.order(ByteOrder.nativeOrder())
-    val v1 = bb.getInt()
-    val v2 = bb.getInt()
-    (v1, v2)
+  def loads(bytes: Array[Byte]): AnyRef = {
+    new Unpickler().loads(bytes)
   }
 
-  /**
-   * Serialize a Rating object into an array of bytes.
-   * It can be deserialized using RatingDeserializer().
-   *
-   * @param rate the Rating object to serialize
-   * @return
-   */
-  def serializeRating(rate: Rating): Array[Byte] = {
-    val len = 3
-    val bytes = new Array[Byte](4 + 8 * len)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.putInt(len)
-    val db = bb.asDoubleBuffer()
-    db.put(rate.user.toDouble)
-    db.put(rate.product.toDouble)
-    db.put(rate.rating)
-    bytes
+  /* convert object into Tuple */
+  def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
+    rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 5711532..4e87fe0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.mllib.linalg
 
+import java.util.Arrays
+
 import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM}
 
 import org.apache.spark.util.random.XORShiftRandom
 
-import java.util.Arrays
-
 /**
  * Trait for a local matrix.
  */
@@ -106,6 +106,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val 
values: Array[Double])
 
   override def toArray: Array[Double] = values
 
+  override def equals(o: Any) = o match {
+    case m: DenseMatrix =>
+      m.numRows == numRows && m.numCols == numCols && Arrays.equals(toArray, 
m.toArray)
+    case _ => false
+  }
+
   private[mllib] def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, 
values)
 
   private[mllib] def apply(i: Int): Double = values(i)

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 478c648..66b58ba 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -106,19 +106,4 @@ class MatrixFactorizationModel private[mllib] (
     }
     scored.top(num)(Ordering.by(_._2))
   }
-
-  /**
-   * :: DeveloperApi ::
-   * Predict the rating of many users for many products.
-   * This is a Java stub for python predictAll()
-   *
-   * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
-   * @return JavaRDD of serialized Rating objects.
-   */
-  @DeveloperApi
-  def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = 
{
-    val usersProducts = usersProductsJRDD.rdd.map(xBytes => 
SerDe.unpackTuple(xBytes))
-    predict(usersProducts).map(rate => SerDe.serializeRating(rate))
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
index 092d67b..db8ed62 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.mllib.api.python
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.mllib.linalg.{Matrices, Vectors}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.recommendation.Rating
 
 class PythonMLLibAPISuite extends FunSuite {
 
-  test("vector serialization") {
+  SerDe.initialize()
+
+  test("pickle vector") {
     val vectors = Seq(
       Vectors.dense(Array.empty[Double]),
       Vectors.dense(0.0),
@@ -33,14 +36,13 @@ class PythonMLLibAPISuite extends FunSuite {
       Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
       Vectors.sparse(2, Array(1), Array(-2.0)))
     vectors.foreach { v =>
-      val bytes = SerDe.serializeDoubleVector(v)
-      val u = SerDe.deserializeDoubleVector(bytes)
+      val u = SerDe.loads(SerDe.dumps(v))
       assert(u.getClass === v.getClass)
       assert(u === v)
     }
   }
 
-  test("labeled point serialization") {
+  test("pickle labeled point") {
     val points = Seq(
       LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
       LabeledPoint(1.0, Vectors.dense(0.0)),
@@ -49,34 +51,44 @@ class PythonMLLibAPISuite extends FunSuite {
       LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], 
Array.empty[Double])),
       LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
     points.foreach { p =>
-      val bytes = SerDe.serializeLabeledPoint(p)
-      val q = SerDe.deserializeLabeledPoint(bytes)
+      val q = SerDe.loads(SerDe.dumps(p)).asInstanceOf[LabeledPoint]
       assert(q.label === p.label)
       assert(q.features.getClass === p.features.getClass)
       assert(q.features === p.features)
     }
   }
 
-  test("double serialization") {
+  test("pickle double") {
     for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, 
Double.NaN)) {
-      val bytes = SerDe.serializeDouble(x)
-      val deser = SerDe.deserializeDouble(bytes)
+      val deser = 
SerDe.loads(SerDe.dumps(x.asInstanceOf[AnyRef])).asInstanceOf[Double]
       // We use `equals` here for comparison because we cannot use `==` for NaN
       assert(x.equals(deser))
     }
   }
 
-  test("matrix to 2D array") {
+  test("pickle matrix") {
     val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
     val matrix = Matrices.dense(2, 3, values)
-    val arr = SerDe.to2dArray(matrix)
-    val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
-    assert(arr === expected)
+    val nm = SerDe.loads(SerDe.dumps(matrix)).asInstanceOf[DenseMatrix]
+    assert(matrix === nm)
 
     // Test conversion for empty matrix
     val empty = Array[Double]()
     val emptyMatrix = Matrices.dense(0, 0, empty)
-    val empty2D = SerDe.to2dArray(emptyMatrix)
-    assert(empty2D === Array[Array[Double]]())
+    val ne = SerDe.loads(SerDe.dumps(emptyMatrix)).asInstanceOf[DenseMatrix]
+    assert(emptyMatrix == ne)
+  }
+
+  test("pickle rating") {
+    val rat = new Rating(1, 2, 3.0)
+    val rat2 = SerDe.loads(SerDe.dumps(rat)).asInstanceOf[Rating]
+    assert(rat == rat2)
+
+    // Test name of class only occur once
+    val rats = (1 to 10).map(x => new Rating(x, x + 1, x + 3.0)).toArray
+    val bytes = SerDe.dumps(rats)
+    assert(bytes.toString.split("Rating").length == 1)
+    assert(bytes.length / 10 < 25) //  25 bytes per rating
+
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/epydoc.conf
----------------------------------------------------------------------
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 51c0faf..8593e08 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -34,5 +34,5 @@ private: no
 
 exclude: pyspark.cloudpickle pyspark.worker pyspark.join
          pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
-         pyspark.rddsampler pyspark.daemon pyspark.mllib._common
+         pyspark.rddsampler pyspark.daemon
          pyspark.mllib.tests pyspark.shuffle

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a17f2c1..064a24b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,6 +211,7 @@ class SparkContext(object):
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeToFile = 
SparkContext._jvm.PythonRDD.writeToFile
                 SparkContext._jvm.SerDeUtil.initialize()
+                SparkContext._jvm.SerDe.initialize()
 
             if instance:
                 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
deleted file mode 100644
index 68f6033..0000000
--- a/python/pyspark/mllib/_common.py
+++ /dev/null
@@ -1,562 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import struct
-import sys
-import numpy
-from numpy import ndarray, float64, int64, int32, array_equal, array
-from pyspark import SparkContext, RDD
-from pyspark.mllib.linalg import SparseVector
-from pyspark.serializers import FramedSerializer
-
-
-"""
-Common utilities shared throughout MLlib, primarily for dealing with
-different data types. These include:
-- Serialization utilities to / from byte arrays that Java can handle
-- Serializers for other data types, like ALS Rating objects
-- Common methods for linear models
-- Methods to deal with the different vector types we support, such as
-  SparseVector and scipy.sparse matrices.
-"""
-
-
-# Check whether we have SciPy. MLlib works without it too, but if we have it, 
some methods,
-# such as _dot and _serialize_double_vector, start to support scipy.sparse 
matrices.
-
-_have_scipy = False
-_scipy_issparse = None
-try:
-    import scipy.sparse
-    _have_scipy = True
-    _scipy_issparse = scipy.sparse.issparse
-except:
-    # No SciPy in environment, but that's okay
-    pass
-
-
-# Serialization functions to and from Scala. These use the following formats, 
understood
-# by the PythonMLLibAPI class in Scala:
-#
-# Dense double vector format:
-#
-# [1-byte 1] [4-byte length] [length*8 bytes of data]
-#
-# Sparse double vector format:
-#
-# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \
-# [nonzeros*8 bytes of values]
-#
-# Double matrix format:
-#
-# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
-#
-# LabeledPoint format:
-#
-# [1-byte 4] [8-byte label] [dense or sparse vector]
-#
-# This is all in machine-endian.  That means that the Java interpreter and the
-# Python interpreter must agree on what endian the machine is.
-
-
-DENSE_VECTOR_MAGIC = 1
-SPARSE_VECTOR_MAGIC = 2
-DENSE_MATRIX_MAGIC = 3
-LABELED_POINT_MAGIC = 4
-
-
-# Workaround for SPARK-2954: before Python 2.7, struct.unpack couldn't unpack 
bytearray()s.
-if sys.version_info[:2] <= (2, 6):
-    def _unpack(fmt, string):
-        return struct.unpack(fmt, buffer(string))
-else:
-    _unpack = struct.unpack
-
-
-def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
-    """
-    Deserialize a numpy array of the given type from an offset in
-    bytearray ba, assigning it the given shape.
-
-    >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
-    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
-    True
-    >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
-    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
-    True
-    >>> x = array([1, 2, 3], dtype=int32)
-    >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, 
dtype=int32))
-    True
-    """
-    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
-    return ar.copy()
-
-
-def _serialize_double(d):
-    """
-    Serialize a double (float or numpy.float64) into a mutually understood 
format.
-    """
-    if type(d) == float or type(d) == float64 or type(d) == int or type(d) == 
long:
-        d = float64(d)
-        ba = bytearray(8)
-        _copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
-        return ba
-    else:
-        raise TypeError("_serialize_double called on non-float input")
-
-
-def _serialize_double_vector(v):
-    """
-    Serialize a double vector into a mutually understood format.
-
-    Note: we currently do not use a magic byte for double for storage
-    efficiency. This should be reconsidered when we add Ser/De for other
-    8-byte types (e.g. Long), for safety. The corresponding deserializer,
-    _deserialize_double, needs to be modified as well if the serialization
-    scheme changes.
-
-    >>> x = array([1,2,3])
-    >>> y = _deserialize_double_vector(_serialize_double_vector(x))
-    >>> array_equal(y, array([1.0, 2.0, 3.0]))
-    True
-    """
-    v = _convert_vector(v)
-    if type(v) == ndarray:
-        return _serialize_dense_vector(v)
-    elif type(v) == SparseVector:
-        return _serialize_sparse_vector(v)
-    else:
-        raise TypeError("_serialize_double_vector called on a %s; "
-                        "wanted ndarray or SparseVector" % type(v))
-
-
-def _serialize_dense_vector(v):
-    """Serialize a dense vector given as a NumPy array."""
-    if v.ndim != 1:
-        raise TypeError("_serialize_double_vector called on a %ddarray; "
-                        "wanted a 1darray" % v.ndim)
-    if v.dtype != float64:
-        if numpy.issubdtype(v.dtype, numpy.complex):
-            raise TypeError("_serialize_double_vector called on an ndarray of 
%s; "
-                            "wanted ndarray of float64" % v.dtype)
-        v = v.astype(float64)
-    length = v.shape[0]
-    ba = bytearray(5 + 8 * length)
-    ba[0] = DENSE_VECTOR_MAGIC
-    length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
-    length_bytes[0] = length
-    _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
-    return ba
-
-
-def _serialize_sparse_vector(v):
-    """Serialize a pyspark.mllib.linalg.SparseVector."""
-    nonzeros = len(v.indices)
-    ba = bytearray(9 + 12 * nonzeros)
-    ba[0] = SPARSE_VECTOR_MAGIC
-    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
-    header[0] = v.size
-    header[1] = nonzeros
-    _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
-    values_offset = 9 + 4 * nonzeros
-    _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], 
dtype=float64)
-    return ba
-
-
-def _deserialize_double(ba, offset=0):
-    """Deserialize a double from a mutually understood format.
-
-    >>> import sys
-    >>> _deserialize_double(_serialize_double(123.0)) == 123.0
-    True
-    >>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
-    True
-    >>> _deserialize_double(_serialize_double(1)) == 1.0
-    True
-    >>> _deserialize_double(_serialize_double(1L)) == 1.0
-    True
-    >>> x = sys.float_info.max
-    >>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
-    True
-    >>> y = float64(sys.float_info.max)
-    >>> _deserialize_double(_serialize_double(sys.float_info.max)) == y
-    True
-    """
-    if type(ba) != bytearray:
-        raise TypeError("_deserialize_double called on a %s; wanted bytearray" 
% type(ba))
-    if len(ba) - offset != 8:
-        raise TypeError("_deserialize_double called on a %d-byte array; wanted 
8 bytes." % nb)
-    return _unpack("d", ba[offset:])[0]
-
-
-def _deserialize_double_vector(ba, offset=0):
-    """Deserialize a double vector from a mutually understood format.
-
-    >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
-    >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
-    True
-    >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
-    >>> s == _deserialize_double_vector(_serialize_double_vector(s))
-    True
-    """
-    if type(ba) != bytearray:
-        raise TypeError("_deserialize_double_vector called on a %s; "
-                        "wanted bytearray" % type(ba))
-    nb = len(ba) - offset
-    if nb < 5:
-        raise TypeError("_deserialize_double_vector called on a %d-byte array, 
"
-                        "which is too short" % nb)
-    if ba[offset] == DENSE_VECTOR_MAGIC:
-        return _deserialize_dense_vector(ba, offset)
-    elif ba[offset] == SPARSE_VECTOR_MAGIC:
-        return _deserialize_sparse_vector(ba, offset)
-    else:
-        raise TypeError("_deserialize_double_vector called on bytearray "
-                        "with wrong magic")
-
-
-def _deserialize_dense_vector(ba, offset=0):
-    """Deserialize a dense vector into a numpy array."""
-    nb = len(ba) - offset
-    if nb < 5:
-        raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
-                        "which is too short" % nb)
-    length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
-    if nb < 8 * length + 5:
-        raise TypeError("_deserialize_dense_vector called on bytearray "
-                        "with wrong length")
-    return _deserialize_numpy_array([length], ba, offset + 5)
-
-
-def _deserialize_sparse_vector(ba, offset=0):
-    """Deserialize a sparse vector into a MLlib SparseVector object."""
-    nb = len(ba) - offset
-    if nb < 9:
-        raise TypeError("_deserialize_sparse_vector called on a %d-byte array, 
"
-                        "which is too short" % nb)
-    header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
-    size = header[0]
-    nonzeros = header[1]
-    if nb < 9 + 12 * nonzeros:
-        raise TypeError("_deserialize_sparse_vector called on bytearray "
-                        "with wrong length")
-    indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
-    values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * 
nonzeros, dtype=float64)
-    return SparseVector(int(size), indices, values)
-
-
-def _serialize_double_matrix(m):
-    """Serialize a double matrix into a mutually understood format."""
-    if (type(m) == ndarray and m.ndim == 2):
-        if m.dtype != float64:
-            if numpy.issubdtype(m.dtype, numpy.complex):
-                raise TypeError("_serialize_double_matrix called on an ndarray 
of %s; "
-                                "wanted ndarray of float64" % m.dtype)
-            m = m.astype(float64)
-        rows = m.shape[0]
-        cols = m.shape[1]
-        ba = bytearray(9 + 8 * rows * cols)
-        ba[0] = DENSE_MATRIX_MAGIC
-        lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
-        lengths[0] = rows
-        lengths[1] = cols
-        _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
-        return ba
-    else:
-        raise TypeError("_serialize_double_matrix called on a "
-                        "non-double-matrix")
-
-
-def _deserialize_double_matrix(ba):
-    """Deserialize a double matrix from a mutually understood format."""
-    if type(ba) != bytearray:
-        raise TypeError("_deserialize_double_matrix called on a %s; "
-                        "wanted bytearray" % type(ba))
-    if len(ba) < 9:
-        raise TypeError("_deserialize_double_matrix called on a %d-byte array, 
"
-                        "which is too short" % len(ba))
-    if ba[0] != DENSE_MATRIX_MAGIC:
-        raise TypeError("_deserialize_double_matrix called on bytearray "
-                        "with wrong magic")
-    lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
-    rows = lengths[0]
-    cols = lengths[1]
-    if (len(ba) != 8 * rows * cols + 9):
-        raise TypeError("_deserialize_double_matrix called on bytearray "
-                        "with wrong length")
-    return _deserialize_numpy_array([rows, cols], ba, 9)
-
-
-def _serialize_labeled_point(p):
-    """
-    Serialize a LabeledPoint with a features vector of any type.
-
-    >>> from pyspark.mllib.regression import LabeledPoint
-    >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
-    >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
-    >>> dp1.label == dp0.label
-    True
-    >>> array_equal(dp1.features, dp0.features)
-    True
-    >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
-    >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
-    >>> sp1.label == sp1.label
-    True
-    >>> sp1.features == sp0.features
-    True
-    """
-    from pyspark.mllib.regression import LabeledPoint
-    serialized_features = _serialize_double_vector(p.features)
-    header = bytearray(9)
-    header[0] = LABELED_POINT_MAGIC
-    header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
-    header_float[0] = p.label
-    return header + serialized_features
-
-
-def _deserialize_labeled_point(ba, offset=0):
-    """Deserialize a LabeledPoint from a mutually understood format."""
-    from pyspark.mllib.regression import LabeledPoint
-    if type(ba) != bytearray:
-        raise TypeError("Expecting a bytearray but got %s" % type(ba))
-    if ba[offset] != LABELED_POINT_MAGIC:
-        raise TypeError("Expecting magic number %d but got %d" % 
(LABELED_POINT_MAGIC, ba[0]))
-    label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
-    features = _deserialize_double_vector(ba, offset + 9)
-    return LabeledPoint(label, features)
-
-
-def _copyto(array, buffer, offset, shape, dtype):
-    """
-    Copy the contents of a vector to a destination bytearray at the
-    given offset.
-
-    TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
-    we should benchmark that to see whether it provides a benefit.
-    """
-    temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, 
dtype=dtype, order='C')
-    temp_array[...] = array
-
-
-def _get_unmangled_rdd(data, serializer, cache=True):
-    """
-    :param cache:  If True, the serialized RDD is cached.  (default = True)
-                   WARNING: Users should unpersist() this later!
-    """
-    dataBytes = data.map(serializer)
-    dataBytes._bypass_serializer = True
-    if cache:
-        dataBytes.cache()
-    return dataBytes
-
-
-def _get_unmangled_double_vector_rdd(data, cache=True):
-    """
-    Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
-    _serialized_double_vectors.
-    :param cache:  If True, the serialized RDD is cached.  (default = True)
-                   WARNING: Users should unpersist() this later!
-    """
-    return _get_unmangled_rdd(data, _serialize_double_vector, cache)
-
-
-def _get_unmangled_labeled_point_rdd(data, cache=True):
-    """
-    Map a pickled Python RDD of LabeledPoint to a Java RDD of 
_serialized_labeled_points.
-    :param cache:  If True, the serialized RDD is cached.  (default = True)
-                   WARNING: Users should unpersist() this later!
-    """
-    return _get_unmangled_rdd(data, _serialize_labeled_point, cache)
-
-
-# Common functions for dealing with and training linear models
-
-def _linear_predictor_typecheck(x, coeffs):
-    """
-    Check that x is a one-dimensional vector of the right shape.
-    This is a temporary hackaround until we actually implement bulk predict.
-    """
-    x = _convert_vector(x)
-    if type(x) == ndarray:
-        if x.ndim == 1:
-            if x.shape != coeffs.shape:
-                raise RuntimeError("Got array of %d elements; wanted %d" % (
-                    numpy.shape(x)[0], coeffs.shape[0]))
-        else:
-            raise RuntimeError("Bulk predict not yet supported.")
-    elif type(x) == SparseVector:
-        if x.size != coeffs.shape[0]:
-            raise RuntimeError("Got sparse vector of size %d; wanted %d" % (
-                x.size, coeffs.shape[0]))
-    elif isinstance(x, RDD):
-        raise RuntimeError("Bulk predict not yet supported.")
-    else:
-        raise TypeError("Argument of type " + type(x).__name__ + " 
unsupported")
-
-
-# If we weren't given initial weights, take a zero vector of the appropriate
-# length.
-def _get_initial_weights(initial_weights, data):
-    if initial_weights is None:
-        initial_weights = _convert_vector(data.first().features)
-        if type(initial_weights) == ndarray:
-            if initial_weights.ndim != 1:
-                raise TypeError("At least one data element has "
-                                + initial_weights.ndim + " dimensions, which 
is not 1")
-            initial_weights = numpy.zeros([initial_weights.shape[0]])
-        elif type(initial_weights) == SparseVector:
-            initial_weights = numpy.zeros([initial_weights.size])
-    return initial_weights
-
-
-# train_func should take two parameters, namely data and initial_weights, and
-# return the result of a call to the appropriate JVM stub.
-# _regression_train_wrapper is responsible for setup and error checking.
-def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
-    initial_weights = _get_initial_weights(initial_weights, data)
-    dataBytes = _get_unmangled_labeled_point_rdd(data)
-    ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
-    if len(ans) != 2:
-        raise RuntimeError("JVM call result had unexpected length")
-    elif type(ans[0]) != bytearray:
-        raise RuntimeError("JVM call result had first element of type "
-                           + type(ans[0]).__name__ + " which is not bytearray")
-    elif type(ans[1]) != float:
-        raise RuntimeError("JVM call result had second element of type "
-                           + type(ans[0]).__name__ + " which is not float")
-    return klass(_deserialize_double_vector(ans[0]), ans[1])
-
-
-# Functions for serializing ALS Rating objects and tuples
-
-def _serialize_rating(r):
-    ba = bytearray(16)
-    intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
-    doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
-    intpart[0], intpart[1], doublepart[0] = r
-    return ba
-
-
-class RatingDeserializer(FramedSerializer):
-
-    def loads(self, string):
-        res = ndarray(shape=(3, ), buffer=string, dtype=float64, offset=4)
-        return int(res[0]), int(res[1]), res[2]
-
-    def load_stream(self, stream):
-        while True:
-            try:
-                yield self._read_with_length(stream)
-            except struct.error:
-                return
-            except EOFError:
-                return
-
-
-def _serialize_tuple(t):
-    ba = bytearray(8)
-    intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
-    intpart[0], intpart[1] = t
-    return ba
-
-
-# Vector math functions that support all of our vector types
-
-def _convert_vector(vec):
-    """
-    Convert a vector to a format we support internally. This does
-    the following:
-
-    * For dense NumPy vectors (ndarray), returns them as is
-    * For our SparseVector class, returns that as is
-    * For Python lists, converts them to NumPy vectors
-    * For scipy.sparse.*_matrix column vectors, converts them to
-      our own SparseVector type.
-
-    This should be called before passing any data to our algorithms
-    or attempting to serialize it to Java.
-    """
-    if type(vec) == ndarray or type(vec) == SparseVector:
-        return vec
-    elif type(vec) == list:
-        return array(vec, dtype=float64)
-    elif _have_scipy:
-        if _scipy_issparse(vec):
-            assert vec.shape[1] == 1, "Expected column vector"
-            csc = vec.tocsc()
-            return SparseVector(vec.shape[0], csc.indices, csc.data)
-    raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse 
matrix")
-
-
-def _squared_distance(v1, v2):
-    """
-    Squared distance of two NumPy or sparse vectors.
-
-    >>> dense1 = array([1., 2.])
-    >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
-    >>> dense2 = array([2., 1.])
-    >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
-    >>> _squared_distance(dense1, dense2)
-    2.0
-    >>> _squared_distance(dense1, sparse2)
-    2.0
-    >>> _squared_distance(sparse1, dense2)
-    2.0
-    >>> _squared_distance(sparse1, sparse2)
-    2.0
-    """
-    v1 = _convert_vector(v1)
-    v2 = _convert_vector(v2)
-    if type(v1) == ndarray and type(v2) == ndarray:
-        diff = v1 - v2
-        return numpy.dot(diff, diff)
-    elif type(v1) == ndarray:
-        return v2.squared_distance(v1)
-    else:
-        return v1.squared_distance(v2)
-
-
-def _dot(vec, target):
-    """
-    Compute the dot product of a vector of the types we support
-    (Numpy array, list, SparseVector, or SciPy sparse) and a target
-    NumPy array that is either 1- or 2-dimensional. Equivalent to
-    calling numpy.dot of the two vectors, but for SciPy ones, we
-    have to transpose them because they're column vectors.
-    """
-    if type(vec) == ndarray:
-        return numpy.dot(vec, target)
-    elif type(vec) == SparseVector:
-        return vec.dot(target)
-    elif type(vec) == list:
-        return numpy.dot(_convert_vector(vec), target)
-    else:
-        return vec.transpose().dot(target)[0]
-
-
-def _test():
-    import doctest
-    globs = globals().copy()
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
-    (failure_count, test_count) = doctest.testmod(globs=globs, 
optionflags=doctest.ELLIPSIS)
-    globs['sc'].stop()
-    if failure_count:
-        exit(-1)
-
-
-if __name__ == "__main__":
-    _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 71ab46b..ac142fb 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -15,19 +15,14 @@
 # limitations under the License.
 #
 
+from math import exp
+
 import numpy
+from numpy import array
 
-from numpy import array, shape
-from pyspark import SparkContext
-from pyspark.mllib._common import \
-    _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
-    _serialize_double_matrix, _deserialize_double_matrix, \
-    _serialize_double_vector, _deserialize_double_vector, \
-    _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
-    _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
-from pyspark.mllib.linalg import SparseVector
-from pyspark.mllib.regression import LabeledPoint, LinearModel
-from math import exp, log
+from pyspark import SparkContext, PickleSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint, LinearModel, 
_regression_train_wrapper
 
 
 __all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'SVMModel',
@@ -67,8 +62,7 @@ class LogisticRegressionModel(LinearModel):
     """
 
     def predict(self, x):
-        _linear_predictor_typecheck(x, self._coeff)
-        margin = _dot(x, self._coeff) + self._intercept
+        margin = self.weights.dot(x) + self._intercept
         if margin > 0:
             prob = 1 / (1 + exp(-margin))
         else:
@@ -81,7 +75,7 @@ class LogisticRegressionWithSGD(object):
 
     @classmethod
     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
-              initialWeights=None, regParam=1.0, regType=None, 
intercept=False):
+              initialWeights=None, regParam=1.0, regType="none", 
intercept=False):
         """
         Train a logistic regression model on the given data.
 
@@ -106,11 +100,12 @@ class LogisticRegressionWithSGD(object):
                                   are activated or not).
         """
         sc = data.context
-        if regType is None:
-            regType = "none"
-        train_func = lambda d, i: 
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
-            d._jrdd, iterations, step, miniBatchFraction, i, regParam, 
regType, intercept)
-        return _regression_train_wrapper(sc, train_func, 
LogisticRegressionModel, data,
+
+        def train(jdata, i):
+            return 
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
+                jdata, iterations, step, miniBatchFraction, i, regParam, 
regType, intercept)
+
+        return _regression_train_wrapper(sc, train, LogisticRegressionModel, 
data,
                                          initialWeights)
 
 
@@ -141,8 +136,7 @@ class SVMModel(LinearModel):
     """
 
     def predict(self, x):
-        _linear_predictor_typecheck(x, self._coeff)
-        margin = _dot(x, self._coeff) + self._intercept
+        margin = self.weights.dot(x) + self.intercept
         return 1 if margin >= 0 else 0
 
 
@@ -150,7 +144,7 @@ class SVMWithSGD(object):
 
     @classmethod
     def train(cls, data, iterations=100, step=1.0, regParam=1.0,
-              miniBatchFraction=1.0, initialWeights=None, regType=None, 
intercept=False):
+              miniBatchFraction=1.0, initialWeights=None, regType="none", 
intercept=False):
         """
         Train a support vector machine on the given data.
 
@@ -175,11 +169,12 @@ class SVMWithSGD(object):
                                   are activated or not).
         """
         sc = data.context
-        if regType is None:
-            regType = "none"
-        train_func = lambda d, i: 
sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
-            d._jrdd, iterations, step, regParam, miniBatchFraction, i, 
regType, intercept)
-        return _regression_train_wrapper(sc, train_func, SVMModel, data, 
initialWeights)
+
+        def train(jrdd, i):
+            return sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
+                jrdd, iterations, step, regParam, miniBatchFraction, i, 
regType, intercept)
+
+        return _regression_train_wrapper(sc, train, SVMModel, data, 
initialWeights)
 
 
 class NaiveBayesModel(object):
@@ -220,7 +215,8 @@ class NaiveBayesModel(object):
 
     def predict(self, x):
         """Return the most likely class for a data vector x"""
-        return self.labels[numpy.argmax(self.pi + _dot(x, 
self.theta.transpose()))]
+        x = _convert_to_vector(x)
+        return self.labels[numpy.argmax(self.pi + 
x.dot(self.theta.transpose()))]
 
 
 class NaiveBayes(object):
@@ -242,12 +238,9 @@ class NaiveBayes(object):
         @param lambda_: The smoothing parameter
         """
         sc = data.context
-        dataBytes = _get_unmangled_labeled_point_rdd(data)
-        ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, 
lambda_)
-        return NaiveBayesModel(
-            _deserialize_double_vector(ans[0]),
-            _deserialize_double_vector(ans[1]),
-            _deserialize_double_matrix(ans[2]))
+        jlist = 
sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+        labels, pi, theta = 
PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
+        return NaiveBayesModel(labels.toArray(), pi.toArray(), 
numpy.array(theta))
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index f3e952a..12c5602 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,15 +15,9 @@
 # limitations under the License.
 #
 
-from numpy import array, dot
-from math import sqrt
 from pyspark import SparkContext
-from pyspark.mllib._common import \
-    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
-    _serialize_double_matrix, _deserialize_double_matrix, \
-    _serialize_double_vector, _deserialize_double_vector, \
-    _get_initial_weights, _serialize_rating, _regression_train_wrapper
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
 
 __all__ = ['KMeansModel', 'KMeans']
 
@@ -32,6 +26,7 @@ class KMeansModel(object):
 
     """A clustering model derived from the k-means method.
 
+    >>> from numpy import array
     >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
     >>> model = KMeans.train(
     ...     sc.parallelize(data), 2, maxIterations=10, runs=30, 
initializationMode="random")
@@ -71,8 +66,9 @@ class KMeansModel(object):
         """Find the cluster to which x belongs in this model."""
         best = 0
         best_distance = float("inf")
-        for i in range(0, len(self.centers)):
-            distance = _squared_distance(x, self.centers[i])
+        x = _convert_to_vector(x)
+        for i in xrange(len(self.centers)):
+            distance = x.squared_distance(self.centers[i])
             if distance < best_distance:
                 best = i
                 best_distance = distance
@@ -82,19 +78,17 @@ class KMeansModel(object):
 class KMeans(object):
 
     @classmethod
-    def train(cls, data, k, maxIterations=100, runs=1, 
initializationMode="k-means||"):
+    def train(cls, rdd, k, maxIterations=100, runs=1, 
initializationMode="k-means||"):
         """Train a k-means clustering model."""
-        sc = data.context
-        dataBytes = _get_unmangled_double_vector_rdd(data)
-        ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(
-            dataBytes._jrdd, k, maxIterations, runs, initializationMode)
-        if len(ans) != 1:
-            raise RuntimeError("JVM call result had unexpected length")
-        elif type(ans[0]) != bytearray:
-            raise RuntimeError("JVM call result had first element of type "
-                               + type(ans[0]) + " which is not bytearray")
-        matrix = _deserialize_double_matrix(ans[0])
-        return KMeansModel([row for row in matrix])
+        sc = rdd.context
+        ser = PickleSerializer()
+        # cache serialized data to avoid objects over head in JVM
+        cached = 
rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
+        model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
+            cached._to_java_object_rdd(), k, maxIterations, runs, 
initializationMode)
+        bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
+        centers = ser.loads(str(bytes))
+        return KMeansModel([c.toArray() for c in centers])
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/fce5e251/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index e69051c..0a5dcaa 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -23,14 +23,148 @@ object from MLlib or pass SciPy C{scipy.sparse} column 
vectors if
 SciPy is available in their environment.
 """
 
-import numpy
-from numpy import array, array_equal, ndarray, float64, int32
+import sys
+import array
+import copy_reg
 
+import numpy as np
 
-__all__ = ['SparseVector', 'Vectors']
+__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
 
 
-class SparseVector(object):
+if sys.version_info[:2] == (2, 7):
+    # speed up pickling array in Python 2.7
+    def fast_pickle_array(ar):
+        return array.array, (ar.typecode, ar.tostring())
+    copy_reg.pickle(array.array, fast_pickle_array)
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, 
some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse 
matrices.
+
+try:
+    import scipy.sparse
+    _have_scipy = True
+except:
+    # No SciPy in environment, but that's okay
+    _have_scipy = False
+
+
+def _convert_to_vector(l):
+    if isinstance(l, Vector):
+        return l
+    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+        return DenseVector(l)
+    elif _have_scipy and scipy.sparse.issparse(l):
+        assert l.shape[1] == 1, "Expected column vector"
+        csc = l.tocsc()
+        return SparseVector(l.shape[0], csc.indices, csc.data)
+    else:
+        raise TypeError("Cannot convert type %s into Vector" % type(l))
+
+
+class Vector(object):
+    """
+    Abstract class for DenseVector and SparseVector
+    """
+    def toArray(self):
+        """
+        Convert the vector into an numpy.ndarray
+        :return: numpy.ndarray
+        """
+        raise NotImplementedError
+
+
+class DenseVector(Vector):
+    def __init__(self, ar):
+        if not isinstance(ar, array.array):
+            ar = array.array('d', ar)
+        self.array = ar
+
+    def __reduce__(self):
+        return DenseVector, (self.array,)
+
+    def dot(self, other):
+        """
+        Compute the dot product of two Vectors. We support
+        (Numpy array, list, SparseVector, or SciPy sparse)
+        and a target NumPy array that is either 1- or 2-dimensional.
+        Equivalent to calling numpy.dot of the two vectors.
+
+        >>> dense = DenseVector(array.array('d', [1., 2.]))
+        >>> dense.dot(dense)
+        5.0
+        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+        4.0
+        >>> dense.dot(range(1, 3))
+        5.0
+        >>> dense.dot(np.array(range(1, 3)))
+        5.0
+        """
+        if isinstance(other, SparseVector):
+            return other.dot(self)
+        elif _have_scipy and scipy.sparse.issparse(other):
+            return other.transpose().dot(self.toArray())[0]
+        elif isinstance(other, Vector):
+            return np.dot(self.toArray(), other.toArray())
+        else:
+            return np.dot(self.toArray(), other)
+
+    def squared_distance(self, other):
+        """
+        Squared distance of two Vectors.
+
+        >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+        >>> dense1.squared_distance(dense1)
+        0.0
+        >>> dense2 = np.array([2., 1.])
+        >>> dense1.squared_distance(dense2)
+        2.0
+        >>> dense3 = [2., 1.]
+        >>> dense1.squared_distance(dense3)
+        2.0
+        >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+        >>> dense1.squared_distance(sparse1)
+        2.0
+        """
+        if isinstance(other, SparseVector):
+            return other.squared_distance(self)
+        elif _have_scipy and scipy.sparse.issparse(other):
+            return _convert_to_vector(other).squared_distance(self)
+
+        if isinstance(other, Vector):
+            other = other.toArray()
+        elif not isinstance(other, np.ndarray):
+            other = np.array(other)
+        diff = self.toArray() - other
+        return np.dot(diff, diff)
+
+    def toArray(self):
+        return np.array(self.array)
+
+    def __getitem__(self, item):
+        return self.array[item]
+
+    def __len__(self):
+        return len(self.array)
+
+    def __str__(self):
+        return "[" + ",".join([str(v) for v in self.array]) + "]"
+
+    def __repr__(self):
+        return "DenseVector(%r)" % self.array
+
+    def __eq__(self, other):
+        return isinstance(other, DenseVector) and self.array == other.array
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __getattr__(self, item):
+        return getattr(self.array, item)
+
+
+class SparseVector(Vector):
 
     """
     A simple sparse vector class for passing data to MLlib. Users may
@@ -61,16 +195,19 @@ class SparseVector(object):
             if type(pairs) == dict:
                 pairs = pairs.items()
             pairs = sorted(pairs)
-            self.indices = array([p[0] for p in pairs], dtype=int32)
-            self.values = array([p[1] for p in pairs], dtype=float64)
+            self.indices = array.array('i', [p[0] for p in pairs])
+            self.values = array.array('d', [p[1] for p in pairs])
         else:
             assert len(args[0]) == len(args[1]), "index and value arrays not 
same length"
-            self.indices = array(args[0], dtype=int32)
-            self.values = array(args[1], dtype=float64)
+            self.indices = array.array('i', args[0])
+            self.values = array.array('d', args[1])
             for i in xrange(len(self.indices) - 1):
                 if self.indices[i] >= self.indices[i + 1]:
                     raise TypeError("indices array must be sorted")
 
+    def __reduce__(self):
+        return (SparseVector, (self.size, self.indices, self.values))
+
     def dot(self, other):
         """
         Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
@@ -78,15 +215,15 @@ class SparseVector(object):
         >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
         >>> a.dot(a)
         25.0
-        >>> a.dot(array([1., 2., 3., 4.]))
+        >>> a.dot(array.array('d', [1., 2., 3., 4.]))
         22.0
         >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
         >>> a.dot(b)
         0.0
-        >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+        >>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
         array([ 22.,  22.])
         """
-        if type(other) == ndarray:
+        if type(other) == np.ndarray:
             if other.ndim == 1:
                 result = 0.0
                 for i in xrange(len(self.indices)):
@@ -94,10 +231,17 @@ class SparseVector(object):
                 return result
             elif other.ndim == 2:
                 results = [self.dot(other[:, i]) for i in 
xrange(other.shape[1])]
-                return array(results)
+                return np.array(results)
             else:
                 raise Exception("Cannot call dot with %d-dimensional array" % 
other.ndim)
-        else:
+
+        elif type(other) in (array.array, DenseVector):
+            result = 0.0
+            for i in xrange(len(self.indices)):
+                result += self.values[i] * other[self.indices[i]]
+            return result
+
+        elif type(other) is SparseVector:
             result = 0.0
             i, j = 0, 0
             while i < len(self.indices) and j < len(other.indices):
@@ -110,6 +254,8 @@ class SparseVector(object):
                 else:
                     j += 1
             return result
+        else:
+            return self.dot(_convert_to_vector(other))
 
     def squared_distance(self, other):
         """
@@ -118,7 +264,9 @@ class SparseVector(object):
         >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
         >>> a.squared_distance(a)
         0.0
-        >>> a.squared_distance(array([1., 2., 3., 4.]))
+        >>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
+        11.0
+        >>> a.squared_distance(np.array([1., 2., 3., 4.]))
         11.0
         >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
         >>> a.squared_distance(b)
@@ -126,22 +274,22 @@ class SparseVector(object):
         >>> b.squared_distance(a)
         30.0
         """
-        if type(other) == ndarray:
-            if other.ndim == 1:
-                result = 0.0
-                j = 0   # index into our own array
-                for i in xrange(other.shape[0]):
-                    if j < len(self.indices) and self.indices[j] == i:
-                        diff = self.values[j] - other[i]
-                        result += diff * diff
-                        j += 1
-                    else:
-                        result += other[i] * other[i]
-                return result
-            else:
+        if type(other) in (list, array.array, DenseVector, np.array, 
np.ndarray):
+            if type(other) is np.array and other.ndim != 1:
                 raise Exception("Cannot call squared_distance with 
%d-dimensional array" %
                                 other.ndim)
-        else:
+            result = 0.0
+            j = 0   # index into our own array
+            for i in xrange(len(other)):
+                if j < len(self.indices) and self.indices[j] == i:
+                    diff = self.values[j] - other[i]
+                    result += diff * diff
+                    j += 1
+                else:
+                    result += other[i] * other[i]
+            return result
+
+        elif type(other) is SparseVector:
             result = 0.0
             i, j = 0, 0
             while i < len(self.indices) and j < len(other.indices):
@@ -163,16 +311,21 @@ class SparseVector(object):
                 result += other.values[j] * other.values[j]
                 j += 1
             return result
+        else:
+            return self.squared_distance(_convert_to_vector(other))
 
     def toArray(self):
         """
         Returns a copy of this SparseVector as a 1-dimensional NumPy array.
         """
-        arr = numpy.zeros(self.size)
+        arr = np.zeros((self.size,), dtype=np.float64)
         for i in xrange(self.indices.size):
             arr[self.indices[i]] = self.values[i]
         return arr
 
+    def __len__(self):
+        return self.size
+
     def __str__(self):
         inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
         vals = "[" + ",".join([str(v) for v in self.values]) + "]"
@@ -198,8 +351,8 @@ class SparseVector(object):
 
         return (isinstance(other, self.__class__)
                 and other.size == self.size
-                and array_equal(other.indices, self.indices)
-                and array_equal(other.values, self.values))
+                and other.indices == self.indices
+                and other.values == self.values)
 
     def __ne__(self, other):
         return not self.__eq__(other)
@@ -242,9 +395,9 @@ class Vectors(object):
         returns a NumPy array.
 
         >>> Vectors.dense([1, 2, 3])
-        array([ 1.,  2.,  3.])
+        DenseVector(array('d', [1.0, 2.0, 3.0]))
         """
-        return array(elements, dtype=float64)
+        return DenseVector(elements)
 
     @staticmethod
     def stringify(vector):
@@ -257,10 +410,39 @@ class Vectors(object):
         >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
         '[0.0,1.0]'
         """
-        if type(vector) == SparseVector:
-            return str(vector)
-        else:
-            return "[" + ",".join([str(v) for v in vector]) + "]"
+        return str(vector)
+
+
+class Matrix(object):
+    """ the Matrix """
+    def __init__(self, nRow, nCol):
+        self.nRow = nRow
+        self.nCol = nCol
+
+    def toArray(self):
+        raise NotImplementedError
+
+
+class DenseMatrix(Matrix):
+    def __init__(self, nRow, nCol, values):
+        Matrix.__init__(self, nRow, nCol)
+        assert len(values) == nRow * nCol
+        self.values = values
+
+    def __reduce__(self):
+        return DenseMatrix, (self.nRow, self.nCol, self.values)
+
+    def toArray(self):
+        """
+        Return an numpy.ndarray
+
+        >>> arr = array.array('d', [float(i) for i in range(4)])
+        >>> m = DenseMatrix(2, 2, arr)
+        >>> m.toArray()
+        array([[ 0.,  1.],
+               [ 2.,  3.]])
+        """
+        return np.ndarray((self.nRow, self.nCol), np.float64, 
buffer=self.values.tostring())
 
 
 def _test():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to