Repository: spark
Updated Branches:
  refs/heads/master d341b17c2 -> 189df165b


[SPARK-1752][MLLIB] Standardize text format for vectors and labeled points

We should standardize the text format used to represent vectors and labeled 
points. The proposed formats are the following:

1. dense vector: `[v0,v1,..]`
2. sparse vector: `(size,[i0,i1],[v0,v1])`
3. labeled point: `(label,vector)`

where "(..)" indicates a tuple and "[...]" indicate an array. 
`loadLabeledPoints` is added to pyspark's `MLUtils`. I didn't add `loadVectors` 
to pyspark because `RDD.saveAsTextFile` cannot stringify dense vectors in the 
proposed format automatically.

`MLUtils#saveLabeledData` and `MLUtils#loadLabeledData` are deprecated. Users 
should use `RDD#saveAsTextFile` and `MLUtils#loadLabeledPoints` instead. In 
Scala, `MLUtils#loadLabeledPoints` is compatible with the format used by 
`MLUtils#loadLabeledData`.

CC: @mateiz, @srowen

Author: Xiangrui Meng <m...@databricks.com>

Closes #685 from mengxr/labeled-io and squashes the following commits:

2d1116a [Xiangrui Meng] make loadLabeledData/saveLabeledData deprecated since 
1.0.1
297be75 [Xiangrui Meng] change LabeledPoint.parse to LabeledPointParser.parse 
to maintain binary compatibility
d6b1473 [Xiangrui Meng] Merge branch 'master' into labeled-io
56746ea [Xiangrui Meng] replace # by .
623a5f0 [Xiangrui Meng] merge master
f06d5ba [Xiangrui Meng] add docs and minor updates
640fe0c [Xiangrui Meng] throw SparkException
5bcfbc4 [Xiangrui Meng] update test to add scientific notations
e86bf38 [Xiangrui Meng] remove NumericTokenizer
050fca4 [Xiangrui Meng] use StringTokenizer
6155b75 [Xiangrui Meng] merge master
f644438 [Xiangrui Meng] remove parse methods based on eval from pyspark
a41675a [Xiangrui Meng] python loadLabeledPoint uses Scala's implementation
ce9a475 [Xiangrui Meng] add deserialize_labeled_point to pyspark with tests
e9fcd49 [Xiangrui Meng] add serializeLabeledPoint and tests
aea4ae3 [Xiangrui Meng] minor updates
810d6df [Xiangrui Meng] update tokenizer/parser implementation
7aac03a [Xiangrui Meng] remove Scala parsers
c1885c1 [Xiangrui Meng] add headers and minor changes
b0c50cb [Xiangrui Meng] add customized parser
d731817 [Xiangrui Meng] style update
63dc396 [Xiangrui Meng] add loadLabeledPoints to pyspark
ea122b5 [Xiangrui Meng] Merge branch 'master' into labeled-io
cd6c78f [Xiangrui Meng] add __str__ and parse to LabeledPoint
a7a178e [Xiangrui Meng] add stringify to pyspark's Vectors
5c2dbfa [Xiangrui Meng] add parse to pyspark's Vectors
7853f88 [Xiangrui Meng] update pyspark's SparseVector.__str__
e761d32 [Xiangrui Meng] make LabelPoint.parse compatible with the dense format 
used before v1.0 and deprecate loadLabeledData and saveLabeledData
9e63a02 [Xiangrui Meng] add loadVectors and loadLabeledPoints
19aa523 [Xiangrui Meng] update toString and add parsers for Vectors and 
LabeledPoint


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/189df165
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/189df165
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/189df165

Branch: refs/heads/master
Commit: 189df165bb7cb8bc8ede48d0e7f8d8b5cd31d299
Parents: d341b17
Author: Xiangrui Meng <m...@databricks.com>
Authored: Wed Jun 4 12:56:56 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Jun 4 12:56:56 2014 -0700

----------------------------------------------------------------------
 .../examples/mllib/DecisionTreeRunner.scala     |   2 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  33 ++++-
 .../org/apache/spark/mllib/linalg/Vectors.scala |  33 ++++-
 .../spark/mllib/regression/LabeledPoint.scala   |  31 ++++-
 .../spark/mllib/util/LinearDataGenerator.scala  |   3 +-
 .../util/LogisticRegressionDataGenerator.scala  |   3 +-
 .../org/apache/spark/mllib/util/MLUtils.scala   |  47 ++++++-
 .../apache/spark/mllib/util/NumericParser.scala | 121 +++++++++++++++++++
 .../spark/mllib/util/SVMDataGenerator.scala     |   2 +-
 .../mllib/api/python/PythonMLLibAPISuite.scala  |  60 +++++++++
 .../spark/mllib/linalg/VectorsSuite.scala       |  25 ++++
 .../mllib/regression/LabeledPointSuite.scala    |  39 ++++++
 .../apache/spark/mllib/util/MLUtilsSuite.scala  |  30 ++++-
 .../spark/mllib/util/NumericParserSuite.scala   |  42 +++++++
 python/pyspark/mllib/_common.py                 |  72 +++++++----
 python/pyspark/mllib/linalg.py                  |  34 ++++--
 python/pyspark/mllib/regression.py              |   5 +-
 python/pyspark/mllib/util.py                    |  69 ++++++++---
 18 files changed, 579 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index 9832bec..b3cc361 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -99,7 +99,7 @@ object DecisionTreeRunner {
     val sc = new SparkContext(conf)
 
     // Load training data and cache it.
-    val examples = MLUtils.loadLabeledData(sc, params.input).cache()
+    val examples = MLUtils.loadLabeledPoints(sc, params.input).cache()
 
     val splits = examples.randomSplit(Array(0.8, 0.2))
     val training = splits(0).cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 7c65b0d..c441737 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
 import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 
 /**
@@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
   private val DENSE_MATRIX_MAGIC: Byte = 3
   private val LABELED_POINT_MAGIC: Byte = 4
 
-  private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): 
Vector = {
+  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int 
= 0): Vector = {
     require(bytes.length - offset >= 5, "Byte array too short")
     val magic = bytes(offset)
     if (magic == DENSE_VECTOR_MAGIC) {
@@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
     bytes
   }
 
-  private def serializeDoubleVector(vector: Vector): Array[Byte] = vector 
match {
+  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = 
vector match {
     case s: SparseVector =>
       serializeSparseVector(s)
     case _ =>
@@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable {
     bytes
   }
 
-  private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
+  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
+    val fb = serializeDoubleVector(p.features)
+    val bytes = new Array[Byte](1 + 8 + fb.length)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.put(LABELED_POINT_MAGIC)
+    bb.putDouble(p.label)
+    bb.put(fb)
+    bytes
+  }
+
+  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): 
LabeledPoint = {
     require(bytes.length >= 9, "Byte array too short")
     val magic = bytes(0)
     if (magic != LABELED_POINT_MAGIC) {
@@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable {
     LabeledPoint(label, deserializeDoubleVector(bytes, 9))
   }
 
+  /**
+   * Loads and serializes labeled points saved with `RDD#saveAsTextFile`.
+   * @param jsc Java SparkContext
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param minPartitions min number of partitions
+   * @return serialized labeled points stored in a JavaRDD of byte array
+   */
+  def loadLabeledPoints(
+      jsc: JavaSparkContext,
+      path: String,
+      minPartitions: Int): JavaRDD[Array[Byte]] =
+    MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(serializeLabeledPoint).toJavaRDD()
+
   private def trainRegressionModel(
       trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
       dataBytesJRDD: JavaRDD[Array[Byte]],

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 84d2239..c818a0b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.mllib.linalg
 
-import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => 
JavaDouble}
+import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => 
JavaIterable}
 import java.util.Arrays
 
 import scala.annotation.varargs
 import scala.collection.JavaConverters._
 
-import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
+
+import org.apache.spark.mllib.util.NumericParser
+import org.apache.spark.SparkException
 
 /**
  * Represents a numeric vector, whose index type is Int and value type is 
Double.
@@ -125,6 +128,25 @@ object Vectors {
   }
 
   /**
+   * Parses a string resulted from `Vector#toString` into
+   * an [[org.apache.spark.mllib.linalg.Vector]].
+   */
+  def parse(s: String): Vector = {
+    parseNumeric(NumericParser.parse(s))
+  }
+
+  private[mllib] def parseNumeric(any: Any): Vector = {
+    any match {
+      case values: Array[Double] =>
+        Vectors.dense(values)
+      case Seq(size: Double, indices: Array[Double], values: Array[Double]) =>
+        Vectors.sparse(size.toInt, indices.map(_.toInt), values)
+      case other =>
+       throw new SparkException(s"Cannot parse $other.")
+    }
+  }
+
+  /**
    * Creates a vector instance from a breeze vector.
    */
   private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = {
@@ -175,9 +197,10 @@ class SparseVector(
     val indices: Array[Int],
     val values: Array[Double]) extends Vector {
 
-  override def toString: String = {
-    "(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
-  }
+  require(indices.length == values.length)
+
+  override def toString: String =
+    "(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), 
values.mkString("[", ",", "]"))
 
   override def toArray: Array[Double] = {
     val data = new Array[Double](size)

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 3deab1a..62a03af 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.mllib.regression
 
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.mllib.util.NumericParser
+import org.apache.spark.SparkException
 
 /**
  * Class that represents the features and labels of a data point.
@@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector
  */
 case class LabeledPoint(label: Double, features: Vector) {
   override def toString: String = {
-    "LabeledPoint(%s, %s)".format(label, features)
+    "(%s,%s)".format(label, features)
+  }
+}
+
+/**
+ * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
+ */
+private[mllib] object LabeledPointParser {
+  /**
+   * Parses a string resulted from `LabeledPoint#toString` into
+   * an [[org.apache.spark.mllib.regression.LabeledPoint]].
+   */
+  def parse(s: String): LabeledPoint = {
+    if (s.startsWith("(")) {
+      NumericParser.parse(s) match {
+        case Seq(label: Double, numeric: Any) =>
+          LabeledPoint(label, Vectors.parseNumeric(numeric))
+        case other =>
+          throw new SparkException(s"Cannot parse $other.")
+      }
+    } else { // dense format used before v1.0
+      val parts = s.split(',')
+      val label = java.lang.Double.parseDouble(parts(0))
+      val features = Vectors.dense(parts(1).trim().split(' 
').map(java.lang.Double.parseDouble))
+      LabeledPoint(label, features)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index c8e160d..69299c2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -129,7 +129,8 @@ object LinearDataGenerator {
     val sc = new SparkContext(sparkMaster, "LinearDataGenerator")
     val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts)
 
-    MLUtils.saveLabeledData(data, outputPath)
+    data.saveAsTextFile(outputPath)
+
     sc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index c82cd8f..9d80267 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator {
     val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator")
     val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts)
 
-    MLUtils.saveLabeledData(data, outputPath)
+    data.saveAsTextFile(outputPath)
+
     sc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index e598b6c..aaf92a1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PartitionwiseSampledRDD
 import org.apache.spark.util.random.BernoulliSampler
-import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.storage.StorageLevel
 
@@ -180,7 +180,39 @@ object MLUtils {
   }
 
   /**
-   * :: Experimental ::
+   * Loads vectors saved using `RDD[Vector].saveAsTextFile`.
+   * @param sc Spark context
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param minPartitions min number of partitions
+   * @return vectors stored as an RDD[Vector]
+   */
+  def loadVectors(sc: SparkContext, path: String, minPartitions: Int): 
RDD[Vector] =
+    sc.textFile(path, minPartitions).map(Vectors.parse)
+
+  /**
+   * Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default 
number of partitions.
+   */
+  def loadVectors(sc: SparkContext, path: String): RDD[Vector] =
+    sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse)
+
+  /**
+   * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`.
+   * @param sc Spark context
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param minPartitions min number of partitions
+   * @return labeled points stored as an RDD[LabeledPoint]
+   */
+  def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): 
RDD[LabeledPoint] =
+    sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
+
+  /**
+   * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with 
the default number of
+   * partitions.
+   */
+  def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
+    loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
+
+  /**
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the 
corresponding label as Double.
@@ -189,8 +221,11 @@ object MLUtils {
    * @param dir Directory to the input data files.
    * @return An RDD of LabeledPoint. Each labeled point has two elements: the 
first element is
    *         the label, and the second element represents the feature values 
(an array of Double).
+   *
+   * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for 
saving and
+   *            [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for 
loading.
    */
-  @Experimental
+  @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1")
   def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
     sc.textFile(dir).map { line =>
       val parts = line.split(',')
@@ -201,15 +236,17 @@ object MLUtils {
   }
 
   /**
-   * :: Experimental ::
    * Save labeled data to a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the 
corresponding label as Double.
    *
    * @param data An RDD of LabeledPoints containing data to be saved.
    * @param dir Directory to save the data.
+   *
+   * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for 
saving and
+   *            [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for 
loading.
    */
-  @Experimental
+  @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1")
   def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
     val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" 
"))
     dataStr.saveAsTextFile(dir)

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala
new file mode 100644
index 0000000..f7cba6c
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import java.util.StringTokenizer
+
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+import org.apache.spark.SparkException
+
+/**
+ * Simple parser for a numeric structure consisting of three types:
+ *
+ *  - number: a double in Java's floating number format
+ *  - array: an array of numbers stored as `[v0,v1,...,vn]`
+ *  - tuple: a list of numbers, arrays, or tuples stored as `(...)`
+ */
+private[mllib] object NumericParser {
+
+  /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */
+  def parse(s: String): Any = {
+    val tokenizer = new StringTokenizer(s, "()[],", true)
+    if (tokenizer.hasMoreTokens()) {
+      val token = tokenizer.nextToken()
+      if (token == "(") {
+        parseTuple(tokenizer)
+      } else if (token == "[") {
+        parseArray(tokenizer)
+      } else {
+        // expecting a number
+        parseDouble(token)
+      }
+    } else {
+      throw new SparkException(s"Cannot find any token from the input string.")
+    }
+  }
+
+  private def parseArray(tokenizer: StringTokenizer): Array[Double] = {
+    val values = ArrayBuffer.empty[Double]
+    var parsing = true
+    var allowComma = false
+    var token: String = null
+    while (parsing && tokenizer.hasMoreTokens()) {
+      token = tokenizer.nextToken()
+      if (token == "]") {
+        parsing = false
+      } else if (token == ",") {
+        if (allowComma) {
+          allowComma = false
+        } else {
+          throw new SparkException("Found a ',' at a wrong position.")
+        }
+      } else {
+        // expecting a number
+        values.append(parseDouble(token))
+        allowComma = true
+      }
+    }
+    if (parsing) {
+      throw new SparkException(s"An array must end with ']'.")
+    }
+    values.toArray
+  }
+
+  private def parseTuple(tokenizer: StringTokenizer): Seq[_] = {
+    val items = ListBuffer.empty[Any]
+    var parsing = true
+    var allowComma = false
+    var token: String = null
+    while (parsing && tokenizer.hasMoreTokens()) {
+      token = tokenizer.nextToken()
+      if (token == "(") {
+        items.append(parseTuple(tokenizer))
+        allowComma = true
+      } else if (token == "[") {
+        items.append(parseArray(tokenizer))
+        allowComma = true
+      } else if (token == ",") {
+        if (allowComma) {
+          allowComma = false
+        } else {
+          throw new SparkException("Found a ',' at a wrong position.")
+        }
+      } else if (token == ")") {
+        parsing = false
+      } else {
+        // expecting a number
+        items.append(parseDouble(token))
+        allowComma = true
+      }
+    }
+    if (parsing) {
+      throw new SparkException(s"A tuple must end with ')'.")
+    }
+    items
+  }
+
+  private def parseDouble(s: String): Double = {
+    try {
+      java.lang.Double.parseDouble(s)
+    } catch {
+      case e: Throwable =>
+        throw new SparkException(s"Cannot parse a double from: $s", e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index ba8190b..7db97e6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -65,7 +65,7 @@ object SVMDataGenerator {
       LabeledPoint(y, Vectors.dense(x))
     }
 
-    MLUtils.saveLabeledData(data, outputPath)
+    data.saveAsTextFile(outputPath)
 
     sc.stop()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
new file mode 100644
index 0000000..642843f
--- /dev/null
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
+
+class PythonMLLibAPISuite extends FunSuite {
+  val py = new PythonMLLibAPI
+
+  test("vector serialization") {
+    val vectors = Seq(
+      Vectors.dense(Array.empty[Double]),
+      Vectors.dense(0.0),
+      Vectors.dense(0.0, -2.0),
+      Vectors.sparse(0, Array.empty[Int], Array.empty[Double]),
+      Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
+      Vectors.sparse(2, Array(1), Array(-2.0)))
+    vectors.foreach { v =>
+      val bytes = py.serializeDoubleVector(v)
+      val u = py.deserializeDoubleVector(bytes)
+      assert(u.getClass === v.getClass)
+      assert(u === v)
+    }
+  }
+
+  test("labeled point serialization") {
+    val points = Seq(
+      LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
+      LabeledPoint(1.0, Vectors.dense(0.0)),
+      LabeledPoint(-0.5, Vectors.dense(0.0, -2.0)),
+      LabeledPoint(0.0, Vectors.sparse(0, Array.empty[Int], 
Array.empty[Double])),
+      LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], 
Array.empty[Double])),
+      LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
+    points.foreach { p =>
+      val bytes = py.serializeLabeledPoint(p)
+      val q = py.deserializeLabeledPoint(bytes)
+      assert(q.label === p.label)
+      assert(q.features.getClass === p.features.getClass)
+      assert(q.features === p.features)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index cfe8a27..7972cee 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkException
+
 class VectorsSuite extends FunSuite {
 
   val arr = Array(0.1, 0.0, 0.3, 0.4)
@@ -100,4 +102,27 @@ class VectorsSuite extends FunSuite {
     assert(vec2(6) === 4.0)
     assert(vec2(7) === 0.0)
   }
+
+  test("parse vectors") {
+    val vectors = Seq(
+      Vectors.dense(Array.empty[Double]),
+      Vectors.dense(1.0),
+      Vectors.dense(1.0E6, 0.0, -2.0e-7),
+      Vectors.sparse(0, Array.empty[Int], Array.empty[Double]),
+      Vectors.sparse(1, Array(0), Array(1.0)),
+      Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0)))
+    vectors.foreach { v =>
+      val v1 = Vectors.parse(v.toString)
+      assert(v.getClass === v1.getClass)
+      assert(v === v1)
+    }
+
+    val malformatted = Seq("1", "[1,,]", "[1,2b]", "(1,[1,2])", 
"([1],[2.0,1.0])")
+    malformatted.foreach { s =>
+      intercept[SparkException] {
+        Vectors.parse(s)
+        println(s"Didn't detect malformatted string $s.")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
new file mode 100644
index 0000000..d9308aa
--- /dev/null
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+
+class LabeledPointSuite extends FunSuite {
+
+  test("parse labeled points") {
+    val points = Seq(
+      LabeledPoint(1.0, Vectors.dense(1.0, 0.0)),
+      LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))))
+    points.foreach { p =>
+      assert(p === LabeledPointParser.parse(p.toString))
+    }
+  }
+
+  test("parse labeled points with v0.9 format") {
+    val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0")
+    assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 3d05fb6..c14870f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -160,5 +160,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext 
{
     }
   }
 
-}
+  test("loadVectors") {
+    val vectors = sc.parallelize(Seq(
+      Vectors.dense(1.0, 2.0),
+      Vectors.sparse(2, Array(1), Array(-1.0)),
+      Vectors.dense(0.0, 1.0)
+    ), 2)
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "vectors")
+    val path = outputDir.toURI.toString
+    vectors.saveAsTextFile(path)
+    val loaded = loadVectors(sc, path)
+    assert(vectors.collect().toSet === loaded.collect().toSet)
+    Utils.deleteRecursively(tempDir)
+  }
 
+  test("loadLabeledPoints") {
+    val points = sc.parallelize(Seq(
+      LabeledPoint(1.0, Vectors.dense(1.0, 2.0)),
+      LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))),
+      LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
+    ), 2)
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "points")
+    val path = outputDir.toURI.toString
+    points.saveAsTextFile(path)
+    val loaded = loadLabeledPoints(sc, path)
+    assert(points.collect().toSet === loaded.collect().toSet)
+    Utils.deleteRecursively(tempDir)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala
new file mode 100644
index 0000000..f68fb95
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.util
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkException
+
+class NumericParserSuite extends FunSuite {
+
+  test("parser") {
+    val s = "((1.0,2e3),-4,[5e-6,7.0E8],+9)"
+    val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]]
+    assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0e3))
+    assert(parsed(1).asInstanceOf[Double] === -4.0)
+    assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0e-6, 7.0e8))
+    assert(parsed(3).asInstanceOf[Double] === 9.0)
+
+    val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4")
+    malformatted.foreach { s =>
+      intercept[SparkException] {
+        NumericParser.parse(s)
+        println(s"Didn't detect malformatted string $s.")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 802a27a..a411a5d 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -22,6 +22,7 @@ from pyspark import SparkContext, RDD
 from pyspark.mllib.linalg import SparseVector
 from pyspark.serializers import Serializer
 
+
 """
 Common utilities shared throughout MLlib, primarily for dealing with
 different data types. These include:
@@ -147,7 +148,7 @@ def _serialize_sparse_vector(v):
     return ba
 
 
-def _deserialize_double_vector(ba):
+def _deserialize_double_vector(ba, offset=0):
     """Deserialize a double vector from a mutually understood format.
 
     >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
@@ -160,43 +161,46 @@ def _deserialize_double_vector(ba):
     if type(ba) != bytearray:
         raise TypeError("_deserialize_double_vector called on a %s; "
                         "wanted bytearray" % type(ba))
-    if len(ba) < 5:
+    nb = len(ba) - offset
+    if nb < 5:
         raise TypeError("_deserialize_double_vector called on a %d-byte array, 
"
-                        "which is too short" % len(ba))
-    if ba[0] == DENSE_VECTOR_MAGIC:
-        return _deserialize_dense_vector(ba)
-    elif ba[0] == SPARSE_VECTOR_MAGIC:
-        return _deserialize_sparse_vector(ba)
+                "which is too short" % nb)
+    if ba[offset] == DENSE_VECTOR_MAGIC:
+        return _deserialize_dense_vector(ba, offset)
+    elif ba[offset] == SPARSE_VECTOR_MAGIC:
+        return _deserialize_sparse_vector(ba, offset)
     else:
         raise TypeError("_deserialize_double_vector called on bytearray "
                         "with wrong magic")
 
 
-def _deserialize_dense_vector(ba):
+def _deserialize_dense_vector(ba, offset=0):
     """Deserialize a dense vector into a numpy array."""
-    if len(ba) < 5:
+    nb = len(ba) - offset
+    if nb < 5:
         raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
-                        "which is too short" % len(ba))
-    length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
-    if len(ba) != 8 * length + 5:
+                        "which is too short" % nb)
+    length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
+    if nb < 8 * length + 5:
         raise TypeError("_deserialize_dense_vector called on bytearray "
                         "with wrong length")
-    return _deserialize_numpy_array([length], ba, 5)
+    return _deserialize_numpy_array([length], ba, offset + 5)
 
 
-def _deserialize_sparse_vector(ba):
+def _deserialize_sparse_vector(ba, offset=0):
     """Deserialize a sparse vector into a MLlib SparseVector object."""
-    if len(ba) < 9:
+    nb = len(ba) - offset
+    if nb < 9:
         raise TypeError("_deserialize_sparse_vector called on a %d-byte array, 
"
-                        "which is too short" % len(ba))
-    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+                        "which is too short" % nb)
+    header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
     size = header[0]
     nonzeros = header[1]
-    if len(ba) != 9 + 12 * nonzeros:
+    if nb < 9 + 12 * nonzeros:
         raise TypeError("_deserialize_sparse_vector called on bytearray "
                         "with wrong length")
-    indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
-    values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, 
dtype=float64)
+    indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
+    values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * 
nonzeros, dtype=float64)
     return SparseVector(int(size), indices, values)
 
 
@@ -243,7 +247,23 @@ def _deserialize_double_matrix(ba):
 
 
 def _serialize_labeled_point(p):
-    """Serialize a LabeledPoint with a features vector of any type."""
+    """
+    Serialize a LabeledPoint with a features vector of any type.
+
+    >>> from pyspark.mllib.regression import LabeledPoint
+    >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
+    >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
+    >>> dp1.label == dp0.label
+    True
+    >>> array_equal(dp1.features, dp0.features)
+    True
+    >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
+    >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
+    >>> sp1.label == sp1.label
+    True
+    >>> sp1.features == sp0.features
+    True
+    """
     from pyspark.mllib.regression import LabeledPoint
     serialized_features = _serialize_double_vector(p.features)
     header = bytearray(9)
@@ -252,6 +272,16 @@ def _serialize_labeled_point(p):
     header_float[0] = p.label
     return header + serialized_features
 
+def _deserialize_labeled_point(ba, offset=0):
+    """Deserialize a LabeledPoint from a mutually understood format."""
+    from pyspark.mllib.regression import LabeledPoint
+    if type(ba) != bytearray:
+        raise TypeError("Expecting a bytearray but got %s" % type(ba))
+    if ba[offset] != LABELED_POINT_MAGIC:
+        raise TypeError("Expecting magic number %d but got %d" % 
(LABELED_POINT_MAGIC, ba[0]))
+    label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
+    features = _deserialize_double_vector(ba, offset + 9)
+    return LabeledPoint(label, features)
 
 def _copyto(array, buffer, offset, shape, dtype):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 2766842..db39ed0 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -43,11 +43,11 @@ class SparseVector(object):
                or two sorted lists containing indices and values.
 
         >>> print SparseVector(4, {1: 1.0, 3: 5.5})
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         >>> print SparseVector(4, [1, 3], [1.0, 5.5])
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         """
         self.size = int(size)
         assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
@@ -160,10 +160,9 @@ class SparseVector(object):
             return result
 
     def __str__(self):
-        inds = self.indices
-        vals = self.values
-        entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in 
xrange(len(inds))])
-        return "[" + entries + "]"
+        inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
+        vals = "[" + ",".join([str(v) for v in self.values]) + "]"
+        return "(" + ",".join((str(self.size), inds, vals)) + ")"
 
     def __repr__(self):
         inds = self.indices
@@ -213,11 +212,11 @@ class Vectors(object):
                      or two sorted lists containing indices and values.
 
         >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
-        [1: 1.0, 3: 5.5]
+        (4,[1,3],[1.0,5.5])
         """
         return SparseVector(size, *args)
 
@@ -232,6 +231,21 @@ class Vectors(object):
         """
         return array(elements, dtype=float64)
 
+    @staticmethod
+    def stringify(vector):
+        """
+        Converts a vector into a string, which can be recognized by
+        Vectors.parse().
+
+        >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0]))
+        '(2,[1],[1.0])'
+        >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
+        '[0.0,1.0]'
+        """
+        if type(vector) == SparseVector:
+            return str(vector)
+        else:
+            return "[" + ",".join([str(v) for v in vector]) + "]"
 
 def _test():
     import doctest

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py 
b/python/pyspark/mllib/regression.py
index bc7de6d..b84bc53 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -23,7 +23,7 @@ from pyspark.mllib._common import \
     _serialize_double_vector, _deserialize_double_vector, \
     _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
     _linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.linalg import SparseVector, Vectors
 
 
 class LabeledPoint(object):
@@ -44,6 +44,9 @@ class LabeledPoint(object):
         else:
             raise TypeError("Expected NumPy array, list, SparseVector, or 
scipy.sparse matrix")
 
+    def __str__(self):
+        return "(" + ",".join((str(self.label), 
Vectors.stringify(self.features))) + ")"
+
 
 class LinearModel(object):
     """A linear model that has a vector of coefficients and an intercept."""

http://git-wip-us.apache.org/repos/asf/spark/blob/189df165/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 0e5f452..e24c144 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,10 @@ import numpy as np
 
 from pyspark.mllib.linalg import Vectors, SparseVector
 from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector
+from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
+from pyspark.rdd import RDD
+from pyspark.serializers import NoOpSerializer
+
 
 
 class MLUtils:
@@ -105,24 +108,18 @@ class MLUtils:
         >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
         >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, 
True).collect()
         >>> tempFile.close()
-        >>> examples[0].label
-        1.0
-        >>> examples[0].features.size
-        6
-        >>> print examples[0].features
-        [0: 1.0, 2: 2.0, 4: 3.0]
-        >>> examples[1].label
-        0.0
-        >>> examples[1].features.size
-        6
-        >>> print examples[1].features
-        []
-        >>> examples[2].label
-        0.0
-        >>> examples[2].features.size
-        6
-        >>> print examples[2].features
-        [1: 4.0, 3: 5.0, 5: 6.0]
+        >>> type(examples[0]) == LabeledPoint
+        True
+        >>> print examples[0]
+        (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
+        >>> type(examples[1]) == LabeledPoint
+        True
+        >>> print examples[1]
+        (0.0,(6,[],[]))
+        >>> type(examples[2]) == LabeledPoint
+        True
+        >>> print examples[2]
+        (0.0,(6,[1,3,5],[4.0,5.0,6.0]))
         >>> multiclass_examples[1].label
         -1.0
         """
@@ -158,6 +155,40 @@ class MLUtils:
         lines.saveAsTextFile(dir)
 
 
+    @staticmethod
+    def loadLabeledPoints(sc, path, minPartitions=None):
+        """
+        Load labeled points saved using RDD.saveAsTextFile.
+
+        @param sc: Spark context
+        @param path: file or directory path in any Hadoop-supported file
+                     system URI
+        @param minPartitions: min number of partitions
+        @return: labeled data stored as an RDD of LabeledPoint
+
+        >>> from tempfile import NamedTemporaryFile
+        >>> from pyspark.mllib.util import MLUtils
+        >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 
4.56e-7)])), \
+                        LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
+        >>> tempFile = NamedTemporaryFile(delete=True)
+        >>> tempFile.close()
+        >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
+        >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
+        >>> type(loaded[0]) == LabeledPoint
+        True
+        >>> print examples[0]
+        (1.1,(3,[0,2],[-1.23,4.56e-07]))
+        >>> type(examples[1]) == LabeledPoint
+        True
+        >>> print examples[1]
+        (0.0,[1.01,2.02,3.03])
+        """
+        minPartitions = minPartitions or min(sc.defaultParallelism, 2)
+        jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, 
path, minPartitions)
+        serialized = RDD(jSerialized, sc, NoOpSerializer())
+        return serialized.map(lambda bytes: 
_deserialize_labeled_point(bytearray(bytes)))
+
+
 def _test():
     import doctest
     from pyspark.context import SparkContext

Reply via email to