Repository: spark
Updated Branches:
  refs/heads/master 6235132a8 -> db2fb84b4


[SPARK-6227][MLLIB][PYSPARK] Implement PySpark wrappers for SVD and PCA (v2)

Add PCA and SVD to PySpark's wrappers for `RowMatrix` and `IndexedRowMatrix` 
(SVD only).

Based on #7963, updated.

## How was this patch tested?

New doc tests and unit tests. Ran all examples locally.

Author: MechCoder <manojkumarsivaraj...@gmail.com>
Author: Nick Pentreath <ni...@za.ibm.com>

Closes #17621 from MLnick/SPARK-6227-pyspark-svd-pca.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db2fb84b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db2fb84b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db2fb84b

Branch: refs/heads/master
Commit: db2fb84b4a3c45daa449cc9232340193ce8eb37d
Parents: 6235132
Author: MechCoder <manojkumarsivaraj...@gmail.com>
Authored: Wed May 3 10:58:05 2017 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Wed May 3 10:58:05 2017 +0200

----------------------------------------------------------------------
 docs/mllib-dimensionality-reduction.md          |  29 +--
 .../spark/examples/mllib/JavaPCAExample.java    |  27 ++-
 .../spark/examples/mllib/JavaSVDExample.java    |  27 +--
 .../main/python/mllib/pca_rowmatrix_example.py  |  46 +++++
 examples/src/main/python/mllib/svd_example.py   |  48 +++++
 .../examples/mllib/PCAOnRowMatrixExample.scala  |   4 +-
 .../spark/examples/mllib/SVDExample.scala       |  11 +-
 python/pyspark/mllib/linalg/distributed.py      | 199 ++++++++++++++++++-
 python/pyspark/mllib/tests.py                   |  63 ++++++
 9 files changed, 408 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/docs/mllib-dimensionality-reduction.md
----------------------------------------------------------------------
diff --git a/docs/mllib-dimensionality-reduction.md 
b/docs/mllib-dimensionality-reduction.md
index 539cbc1..a72680d 100644
--- a/docs/mllib-dimensionality-reduction.md
+++ b/docs/mllib-dimensionality-reduction.md
@@ -76,13 +76,14 @@ Refer to the [`SingularValueDecomposition` Java 
docs](api/java/org/apache/spark/
 
 The same code applies to `IndexedRowMatrix` if `U` is defined as an
 `IndexedRowMatrix`.
+</div>
+<div data-lang="python" markdown="1">
+Refer to the [`SingularValueDecomposition` Python 
docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.SingularValueDecomposition)
 for details on the API.
 
-In order to run the above application, follow the instructions
-provided in the [Self-Contained
-Applications](quick-start.html#self-contained-applications) section of the 
Spark
-quick-start guide. Be sure to also include *spark-mllib* to your build file as
-a dependency.
+{% include_example python/mllib/svd_example.py %}
 
+The same code applies to `IndexedRowMatrix` if `U` is defined as an
+`IndexedRowMatrix`.
 </div>
 </div>
 
@@ -118,17 +119,21 @@ Refer to the [`PCA` Scala 
docs](api/scala/index.html#org.apache.spark.mllib.feat
 
 The following code demonstrates how to compute principal components on a 
`RowMatrix`
 and use them to project the vectors into a low-dimensional space.
-The number of columns should be small, e.g, less than 1000.
 
 Refer to the [`RowMatrix` Java 
docs](api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html) for 
details on the API.
 
 {% include_example java/org/apache/spark/examples/mllib/JavaPCAExample.java %}
 
 </div>
-</div>
 
-In order to run the above application, follow the instructions
-provided in the [Self-Contained 
Applications](quick-start.html#self-contained-applications)
-section of the Spark
-quick-start guide. Be sure to also include *spark-mllib* to your build file as
-a dependency.
+<div data-lang="python" markdown="1">
+
+The following code demonstrates how to compute principal components on a 
`RowMatrix`
+and use them to project the vectors into a low-dimensional space.
+
+Refer to the [`RowMatrix` Python 
docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) 
for details on the API.
+
+{% include_example python/mllib/pca_rowmatrix_example.py %}
+
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPCAExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPCAExample.java 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPCAExample.java
index 3077f55..0a7dc62 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPCAExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPCAExample.java
@@ -18,7 +18,8 @@
 package org.apache.spark.examples.mllib;
 
 // $example on$
-import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.List;
 // $example off$
 
 import org.apache.spark.SparkConf;
@@ -39,21 +40,25 @@ public class JavaPCAExample {
   public static void main(String[] args) {
     SparkConf conf = new SparkConf().setAppName("PCA Example");
     SparkContext sc = new SparkContext(conf);
+    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
 
     // $example on$
-    double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 
20.5}};
-    LinkedList<Vector> rowsList = new LinkedList<>();
-    for (int i = 0; i < array.length; i++) {
-      Vector currentRow = Vectors.dense(array[i]);
-      rowsList.add(currentRow);
-    }
-    JavaRDD<Vector> rows = 
JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);
+    List<Vector> data = Arrays.asList(
+            Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
+            Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+            Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+    );
+
+    JavaRDD<Vector> rows = jsc.parallelize(data);
 
     // Create a RowMatrix from JavaRDD<Vector>.
     RowMatrix mat = new RowMatrix(rows.rdd());
 
-    // Compute the top 3 principal components.
-    Matrix pc = mat.computePrincipalComponents(3);
+    // Compute the top 4 principal components.
+    // Principal components are stored in a local dense matrix.
+    Matrix pc = mat.computePrincipalComponents(4);
+
+    // Project the rows to the linear space spanned by the top 4 principal 
components.
     RowMatrix projected = mat.multiply(pc);
     // $example off$
     Vector[] collectPartitions = (Vector[])projected.rows().collect();
@@ -61,6 +66,6 @@ public class JavaPCAExample {
     for (Vector vector : collectPartitions) {
       System.out.println("\t" + vector);
     }
-    sc.stop();
+    jsc.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVDExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVDExample.java 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVDExample.java
index 3730e60..802be39 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVDExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVDExample.java
@@ -18,7 +18,8 @@
 package org.apache.spark.examples.mllib;
 
 // $example on$
-import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.List;
 // $example off$
 
 import org.apache.spark.SparkConf;
@@ -43,22 +44,22 @@ public class JavaSVDExample {
     JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
 
     // $example on$
-    double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 
20.5}};
-    LinkedList<Vector> rowsList = new LinkedList<>();
-    for (int i = 0; i < array.length; i++) {
-      Vector currentRow = Vectors.dense(array[i]);
-      rowsList.add(currentRow);
-    }
-    JavaRDD<Vector> rows = jsc.parallelize(rowsList);
+    List<Vector> data = Arrays.asList(
+            Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
+            Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+            Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+    );
+
+    JavaRDD<Vector> rows = jsc.parallelize(data);
 
     // Create a RowMatrix from JavaRDD<Vector>.
     RowMatrix mat = new RowMatrix(rows.rdd());
 
-    // Compute the top 3 singular values and corresponding singular vectors.
-    SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(3, 
true, 1.0E-9d);
-    RowMatrix U = svd.U();
-    Vector s = svd.s();
-    Matrix V = svd.V();
+    // Compute the top 5 singular values and corresponding singular vectors.
+    SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, 
true, 1.0E-9d);
+    RowMatrix U = svd.U();  // The U factor is a RowMatrix.
+    Vector s = svd.s();     // The singular values are stored in a local dense 
vector.
+    Matrix V = svd.V();     // The V factor is a local dense matrix.
     // $example off$
     Vector[] collectPartitions = (Vector[]) U.rows().collect();
     System.out.println("U factor is:");

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/python/mllib/pca_rowmatrix_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/pca_rowmatrix_example.py 
b/examples/src/main/python/mllib/pca_rowmatrix_example.py
new file mode 100644
index 0000000..49b9b1b
--- /dev/null
+++ b/examples/src/main/python/mllib/pca_rowmatrix_example.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.linalg.distributed import RowMatrix
+# $example off$
+
+if __name__ == "__main__":
+    sc = SparkContext(appName="PythonPCAOnRowMatrixExample")
+
+    # $example on$
+    rows = sc.parallelize([
+        Vectors.sparse(5, {1: 1.0, 3: 7.0}),
+        Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+        Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+    ])
+
+    mat = RowMatrix(rows)
+    # Compute the top 4 principal components.
+    # Principal components are stored in a local dense matrix.
+    pc = mat.computePrincipalComponents(4)
+
+    # Project the rows to the linear space spanned by the top 4 principal 
components.
+    projected = mat.multiply(pc)
+    # $example off$
+    collected = projected.rows.collect()
+    print("Projected Row Matrix of principal component:")
+    for vector in collected:
+        print(vector)
+    sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/python/mllib/svd_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/svd_example.py 
b/examples/src/main/python/mllib/svd_example.py
new file mode 100644
index 0000000..5b220fd
--- /dev/null
+++ b/examples/src/main/python/mllib/svd_example.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.linalg.distributed import RowMatrix
+# $example off$
+
+if __name__ == "__main__":
+    sc = SparkContext(appName="PythonSVDExample")
+
+    # $example on$
+    rows = sc.parallelize([
+        Vectors.sparse(5, {1: 1.0, 3: 7.0}),
+        Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+        Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+    ])
+
+    mat = RowMatrix(rows)
+
+    # Compute the top 5 singular values and corresponding singular vectors.
+    svd = mat.computeSVD(5, computeU=True)
+    U = svd.U       # The U factor is a RowMatrix.
+    s = svd.s       # The singular values are stored in a local dense vector.
+    V = svd.V       # The V factor is a local dense matrix.
+    # $example off$
+    collected = U.rows.collect()
+    print("U factor is:")
+    for vector in collected:
+        print(vector)
+    print("Singular values are: %s" % s)
+    print("V factor is:\n%s" % V)
+    sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/scala/org/apache/spark/examples/mllib/PCAOnRowMatrixExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PCAOnRowMatrixExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PCAOnRowMatrixExample.scala
index a137ba2..da43a8d 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PCAOnRowMatrixExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PCAOnRowMatrixExample.scala
@@ -39,9 +39,9 @@ object PCAOnRowMatrixExample {
       Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
       Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
 
-    val dataRDD = sc.parallelize(data, 2)
+    val rows = sc.parallelize(data)
 
-    val mat: RowMatrix = new RowMatrix(dataRDD)
+    val mat: RowMatrix = new RowMatrix(rows)
 
     // Compute the top 4 principal components.
     // Principal components are stored in a local dense matrix.

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/examples/src/main/scala/org/apache/spark/examples/mllib/SVDExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/SVDExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/mllib/SVDExample.scala
index b286a3f..769ae2a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/SVDExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SVDExample.scala
@@ -28,6 +28,9 @@ import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
 // $example off$
 
+/**
+ * Example for SingularValueDecomposition.
+ */
 object SVDExample {
 
   def main(args: Array[String]): Unit = {
@@ -41,15 +44,15 @@ object SVDExample {
       Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
       Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
 
-    val dataRDD = sc.parallelize(data, 2)
+    val rows = sc.parallelize(data)
 
-    val mat: RowMatrix = new RowMatrix(dataRDD)
+    val mat: RowMatrix = new RowMatrix(rows)
 
     // Compute the top 5 singular values and corresponding singular vectors.
     val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, 
computeU = true)
     val U: RowMatrix = svd.U  // The U factor is a RowMatrix.
-    val s: Vector = svd.s  // The singular values are stored in a local dense 
vector.
-    val V: Matrix = svd.V  // The V factor is a local dense matrix.
+    val s: Vector = svd.s     // The singular values are stored in a local 
dense vector.
+    val V: Matrix = svd.V     // The V factor is a local dense matrix.
     // $example off$
     val collect = U.rows.collect()
     println("U factor is:")

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/python/pyspark/mllib/linalg/distributed.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg/distributed.py 
b/python/pyspark/mllib/linalg/distributed.py
index 600655c..4cb8025 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -28,14 +28,13 @@ from py4j.java_gateway import JavaObject
 
 from pyspark import RDD, since
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
+from pyspark.mllib.linalg import _convert_to_vector, DenseMatrix, Matrix, 
QRDecomposition
 from pyspark.mllib.stat import MultivariateStatisticalSummary
 from pyspark.storagelevel import StorageLevel
 
 
-__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
-           'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
-           'BlockMatrix']
+__all__ = ['BlockMatrix', 'CoordinateMatrix', 'DistributedMatrix', 
'IndexedRow',
+           'IndexedRowMatrix', 'MatrixEntry', 'RowMatrix', 
'SingularValueDecomposition']
 
 
 class DistributedMatrix(object):
@@ -301,6 +300,136 @@ class RowMatrix(DistributedMatrix):
         R = decomp.call("R")
         return QRDecomposition(Q, R)
 
+    @since('2.2.0')
+    def computeSVD(self, k, computeU=False, rCond=1e-9):
+        """
+        Computes the singular value decomposition of the RowMatrix.
+
+        The given row matrix A of dimension (m X n) is decomposed into
+        U * s * V'T where
+
+        * U: (m X k) (left singular vectors) is a RowMatrix whose
+             columns are the eigenvectors of (A X A')
+        * s: DenseVector consisting of square root of the eigenvalues
+             (singular values) in descending order.
+        * v: (n X k) (right singular vectors) is a Matrix whose columns
+             are the eigenvectors of (A' X A)
+
+        For more specific details on implementation, please refer
+        the Scala documentation.
+
+        :param k: Number of leading singular values to keep (`0 < k <= n`).
+                  It might return less than k if there are numerically zero 
singular values
+                  or there are not enough Ritz values converged before the 
maximum number of
+                  Arnoldi update iterations is reached (in case that matrix A 
is ill-conditioned).
+        :param computeU: Whether or not to compute U. If set to be
+                         True, then U is computed by A * V * s^-1
+        :param rCond: Reciprocal condition number. All singular values
+                      smaller than rCond * s[0] are treated as zero
+                      where s[0] is the largest singular value.
+        :returns: :py:class:`SingularValueDecomposition`
+
+        >>> rows = sc.parallelize([[3, 1, 1], [-1, 3, 1]])
+        >>> rm = RowMatrix(rows)
+
+        >>> svd_model = rm.computeSVD(2, True)
+        >>> svd_model.U.rows.collect()
+        [DenseVector([-0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])]
+        >>> svd_model.s
+        DenseVector([3.4641, 3.1623])
+        >>> svd_model.V
+        DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
+        """
+        j_model = self._java_matrix_wrapper.call(
+            "computeSVD", int(k), bool(computeU), float(rCond))
+        return SingularValueDecomposition(j_model)
+
+    @since('2.2.0')
+    def computePrincipalComponents(self, k):
+        """
+        Computes the k principal components of the given row matrix
+
+        .. note:: This cannot be computed on matrices with more than 65535 
columns.
+
+        :param k: Number of principal components to keep.
+        :returns: :py:class:`pyspark.mllib.linalg.DenseMatrix`
+
+        >>> rows = sc.parallelize([[1, 2, 3], [2, 4, 5], [3, 6, 1]])
+        >>> rm = RowMatrix(rows)
+
+        >>> # Returns the two principal components of rm
+        >>> pca = rm.computePrincipalComponents(2)
+        >>> pca
+        DenseMatrix(3, 2, [-0.349, -0.6981, 0.6252, -0.2796, -0.5592, 
-0.7805], 0)
+
+        >>> # Transform into new dimensions with the greatest variance.
+        >>> rm.multiply(pca).rows.collect() # doctest: +NORMALIZE_WHITESPACE
+        [DenseVector([0.1305, -3.7394]), DenseVector([-0.3642, -6.6983]), \
+        DenseVector([-4.6102, -4.9745])]
+        """
+        return self._java_matrix_wrapper.call("computePrincipalComponents", k)
+
+    @since('2.2.0')
+    def multiply(self, matrix):
+        """
+        Multiply this matrix by a local dense matrix on the right.
+
+        :param matrix: a local dense matrix whose number of rows must match 
the number of columns
+                       of this matrix
+        :returns: :py:class:`RowMatrix`
+
+        >>> rm = RowMatrix(sc.parallelize([[0, 1], [2, 3]]))
+        >>> rm.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
+        [DenseVector([2.0, 3.0]), DenseVector([6.0, 11.0])]
+        """
+        if not isinstance(matrix, DenseMatrix):
+            raise ValueError("Only multiplication with DenseMatrix "
+                             "is supported.")
+        j_model = self._java_matrix_wrapper.call("multiply", matrix)
+        return RowMatrix(j_model)
+
+
+class SingularValueDecomposition(JavaModelWrapper):
+    """
+    Represents singular value decomposition (SVD) factors.
+
+    .. versionadded:: 2.2.0
+    """
+
+    @property
+    @since('2.2.0')
+    def U(self):
+        """
+        Returns a distributed matrix whose columns are the left
+        singular vectors of the SingularValueDecomposition if computeU was set 
to be True.
+        """
+        u = self.call("U")
+        if u is not None:
+            mat_name = u.getClass().getSimpleName()
+            if mat_name == "RowMatrix":
+                return RowMatrix(u)
+            elif mat_name == "IndexedRowMatrix":
+                return IndexedRowMatrix(u)
+            else:
+                raise TypeError("Expected RowMatrix/IndexedRowMatrix got %s" % 
mat_name)
+
+    @property
+    @since('2.2.0')
+    def s(self):
+        """
+        Returns a DenseVector with singular values in descending order.
+        """
+        return self.call("s")
+
+    @property
+    @since('2.2.0')
+    def V(self):
+        """
+        Returns a DenseMatrix whose columns are the right singular
+        vectors of the SingularValueDecomposition.
+        """
+        return self.call("V")
+
 
 class IndexedRow(object):
     """
@@ -528,6 +657,68 @@ class IndexedRowMatrix(DistributedMatrix):
                                                            colsPerBlock)
         return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
 
+    @since('2.2.0')
+    def computeSVD(self, k, computeU=False, rCond=1e-9):
+        """
+        Computes the singular value decomposition of the IndexedRowMatrix.
+
+        The given row matrix A of dimension (m X n) is decomposed into
+        U * s * V'T where
+
+        * U: (m X k) (left singular vectors) is a IndexedRowMatrix
+             whose columns are the eigenvectors of (A X A')
+        * s: DenseVector consisting of square root of the eigenvalues
+             (singular values) in descending order.
+        * v: (n X k) (right singular vectors) is a Matrix whose columns
+             are the eigenvectors of (A' X A)
+
+        For more specific details on implementation, please refer
+        the scala documentation.
+
+        :param k: Number of leading singular values to keep (`0 < k <= n`).
+                  It might return less than k if there are numerically zero 
singular values
+                  or there are not enough Ritz values converged before the 
maximum number of
+                  Arnoldi update iterations is reached (in case that matrix A 
is ill-conditioned).
+        :param computeU: Whether or not to compute U. If set to be
+                         True, then U is computed by A * V * s^-1
+        :param rCond: Reciprocal condition number. All singular values
+                      smaller than rCond * s[0] are treated as zero
+                      where s[0] is the largest singular value.
+        :returns: SingularValueDecomposition object
+
+        >>> rows = [(0, (3, 1, 1)), (1, (-1, 3, 1))]
+        >>> irm = IndexedRowMatrix(sc.parallelize(rows))
+        >>> svd_model = irm.computeSVD(2, True)
+        >>> svd_model.U.rows.collect() # doctest: +NORMALIZE_WHITESPACE
+        [IndexedRow(0, [-0.707106781187,0.707106781187]),\
+        IndexedRow(1, [-0.707106781187,-0.707106781187])]
+        >>> svd_model.s
+        DenseVector([3.4641, 3.1623])
+        >>> svd_model.V
+        DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
+        """
+        j_model = self._java_matrix_wrapper.call(
+            "computeSVD", int(k), bool(computeU), float(rCond))
+        return SingularValueDecomposition(j_model)
+
+    @since('2.2.0')
+    def multiply(self, matrix):
+        """
+        Multiply this matrix by a local dense matrix on the right.
+
+        :param matrix: a local dense matrix whose number of rows must match 
the number of columns
+                       of this matrix
+        :returns: :py:class:`IndexedRowMatrix`
+
+        >>> mat = IndexedRowMatrix(sc.parallelize([(0, (0, 1)), (1, (2, 3))]))
+        >>> mat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
+        [IndexedRow(0, [2.0,3.0]), IndexedRow(1, [6.0,11.0])]
+        """
+        if not isinstance(matrix, DenseMatrix):
+            raise ValueError("Only multiplication with DenseMatrix "
+                             "is supported.")
+        return IndexedRowMatrix(self._java_matrix_wrapper.call("multiply", 
matrix))
+
 
 class MatrixEntry(object):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/db2fb84b/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 523b3f1..1037bab 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -23,6 +23,7 @@ import os
 import sys
 import tempfile
 import array as pyarray
+from math import sqrt
 from time import time, sleep
 from shutil import rmtree
 
@@ -54,6 +55,7 @@ from pyspark.mllib.common import _to_java_object_rdd
 from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, 
_convert_to_vector,\
     DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
+from pyspark.mllib.linalg.distributed import RowMatrix
 from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
 from pyspark.mllib.recommendation import Rating
 from pyspark.mllib.regression import LabeledPoint, 
StreamingLinearRegressionWithSGD
@@ -1699,6 +1701,67 @@ class HashingTFTest(MLlibTestCase):
                                    ": expected " + str(expected[i]) + ", got " 
+ str(output[i]))
 
 
+class DimensionalityReductionTests(MLlibTestCase):
+
+    denseData = [
+        Vectors.dense([0.0, 1.0, 2.0]),
+        Vectors.dense([3.0, 4.0, 5.0]),
+        Vectors.dense([6.0, 7.0, 8.0]),
+        Vectors.dense([9.0, 0.0, 1.0])
+    ]
+    sparseData = [
+        Vectors.sparse(3, [(1, 1.0), (2, 2.0)]),
+        Vectors.sparse(3, [(0, 3.0), (1, 4.0), (2, 5.0)]),
+        Vectors.sparse(3, [(0, 6.0), (1, 7.0), (2, 8.0)]),
+        Vectors.sparse(3, [(0, 9.0), (2, 1.0)])
+    ]
+
+    def assertEqualUpToSign(self, vecA, vecB):
+        eq1 = vecA - vecB
+        eq2 = vecA + vecB
+        self.assertTrue(sum(abs(eq1)) < 1e-6 or sum(abs(eq2)) < 1e-6)
+
+    def test_svd(self):
+        denseMat = RowMatrix(self.sc.parallelize(self.denseData))
+        sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
+        m = 4
+        n = 3
+        for mat in [denseMat, sparseMat]:
+            for k in range(1, 4):
+                rm = mat.computeSVD(k, computeU=True)
+                self.assertEqual(rm.s.size, k)
+                self.assertEqual(rm.U.numRows(), m)
+                self.assertEqual(rm.U.numCols(), k)
+                self.assertEqual(rm.V.numRows, n)
+                self.assertEqual(rm.V.numCols, k)
+
+        # Test that U returned is None if computeU is set to False.
+        self.assertEqual(mat.computeSVD(1).U, None)
+
+        # Test that low rank matrices cannot have number of singular values
+        # greater than a limit.
+        rm = RowMatrix(self.sc.parallelize(tile([1, 2, 3], (3, 1))))
+        self.assertEqual(rm.computeSVD(3, False, 1e-6).s.size, 1)
+
+    def test_pca(self):
+        expected_pcs = array([
+            [0.0, 1.0, 0.0],
+            [sqrt(2.0) / 2.0, 0.0, sqrt(2.0) / 2.0],
+            [sqrt(2.0) / 2.0, 0.0, -sqrt(2.0) / 2.0]
+        ])
+        n = 3
+        denseMat = RowMatrix(self.sc.parallelize(self.denseData))
+        sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
+        for mat in [denseMat, sparseMat]:
+            for k in range(1, 4):
+                pcs = mat.computePrincipalComponents(k)
+                self.assertEqual(pcs.numRows, n)
+                self.assertEqual(pcs.numCols, k)
+
+                # We can just test the updated principal component for 
equality.
+                self.assertEqualUpToSign(pcs.toArray()[:, k - 1], 
expected_pcs[:, k - 1])
+
+
 if __name__ == "__main__":
     from pyspark.mllib.tests import *
     if not _have_scipy:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to