Repository: spark
Updated Branches:
  refs/heads/master 327456b48 -> ce40efa20


[SPARK-25790][MLLIB] PCA: Support more than 65535 column matrix

## What changes were proposed in this pull request?
Spark PCA supports maximum only ~65,535 columns matrix. This is due to the fact 
that, it computes the Covariance matrix first, then compute principle 
components. The main bottle neck was computing **covariance matrix.** The limit 
65,500 came due to the integer size limit. Because we are passing an array of 
size n*(n+1)/2 to the breeze library and the size cannot be more than INT_MAX. 
so, the maximum column size we can give is 65,500.

Currently we don't have such limitation for computing SVD in spark.  So, we can 
make use of Spark SVD to compute the PCA, if the number of columns exceeds the 
limit.

Computation of PCA can be done directly using SVD of matrix, instead of finding 
the covariance matrix.
Following are the papers/links for the reference.

https://arxiv.org/pdf/1404.1100.pdf
https://en.wikipedia.org/wiki/Principal_component_analysis#Singular_value_decomposition
http://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-16-17/Web-Mining-Agents/PCA-SVD.pdf

## How was this patch tested?
added UT, also manually verified with the existing test for pca, by removing 
the limit condition in the fit method.

Closes #22784 from shahidki31/PCA.

Authored-by: Shahid <shahidk...@gmail.com>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce40efa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce40efa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce40efa2

Branch: refs/heads/master
Commit: ce40efa200e6cb6f10a289e2ab00f711b0ebd379
Parents: 327456b
Author: Shahid <shahidk...@gmail.com>
Authored: Tue Oct 30 08:39:30 2018 -0500
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Tue Oct 30 08:39:30 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/mllib/feature/PCA.scala    | 20 +++++++++---
 .../mllib/linalg/distributed/RowMatrix.scala    | 33 +++++++++++++-------
 .../apache/spark/mllib/feature/PCASuite.scala   | 23 +++++++++++++-
 3 files changed, 58 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
index a01503f..2fc517c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -21,6 +21,7 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.stat.Statistics
 import org.apache.spark.rdd.RDD
 
 /**
@@ -44,12 +45,21 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
     require(k <= numFeatures,
       s"source vector size $numFeatures must be no less than k=$k")
 
-    require(PCAUtil.memoryCost(k, numFeatures) < Int.MaxValue,
-      "The param k and numFeatures is too large for SVD computation. " +
-      "Try reducing the parameter k for PCA, or reduce the input feature " +
-      "vector dimension to make this tractable.")
+    val mat = if (numFeatures > 65535) {
+      val meanVector = Statistics.colStats(sources).mean.asBreeze
+      val meanCentredRdd = sources.map { rowVector =>
+        Vectors.fromBreeze(rowVector.asBreeze - meanVector)
+      }
+      new RowMatrix(meanCentredRdd)
+    } else {
+      require(PCAUtil.memoryCost(k, numFeatures) < Int.MaxValue,
+        "The param k and numFeatures is too large for SVD computation. " +
+          "Try reducing the parameter k for PCA, or reduce the input feature " 
+
+          "vector dimension to make this tractable.")
+
+      new RowMatrix(sources)
+    }
 
-    val mat = new RowMatrix(sources)
     val (pc, explainedVariance) = 
mat.computePrincipalComponentsAndExplainedVariance(k)
     val densePC = pc match {
       case dm: DenseMatrix =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 78a8810..82ab716 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -370,32 +370,41 @@ class RowMatrix @Since("1.0.0") (
    * Each column corresponds for one principal component,
    * and the columns are in descending order of component variance.
    * The row data do not need to be "centered" first; it is not necessary for
-   * the mean of each column to be 0.
+   * the mean of each column to be 0. But, if the number of columns are more 
than
+   * 65535, then the data need to be "centered".
    *
    * @param k number of top principal components.
    * @return a matrix of size n-by-k, whose columns are principal components, 
and
    * a vector of values which indicate how much variance each principal 
component
    * explains
-   *
-   * @note This cannot be computed on matrices with more than 65535 columns.
    */
   @Since("1.6.0")
   def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) 
= {
     val n = numCols().toInt
     require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
 
-    val Cov = computeCovariance().asBreeze.asInstanceOf[BDM[Double]]
+    if (n > 65535) {
+      val svd = computeSVD(k)
+      val s = svd.s.toArray.map(eigValue => eigValue * eigValue / (n - 1))
+      val eigenSum = s.sum
+      val explainedVariance = s.map(_ / eigenSum)
 
-    val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
+      (svd.V, Vectors.dense(explainedVariance))
+    } else {
 
-    val eigenSum = s.data.sum
-    val explainedVariance = s.data.map(_ / eigenSum)
+      val Cov = computeCovariance().asBreeze.asInstanceOf[BDM[Double]]
 
-    if (k == n) {
-      (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
-    } else {
-      (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
-        Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
+      val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
+
+      val eigenSum = s.data.sum
+      val explainedVariance = s.data.map(_ / eigenSum)
+
+      if (k == n) {
+        (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
+      } else {
+        (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
+          Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
index 8eab124..fe49162 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.mllib.feature
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
@@ -54,4 +54,25 @@ class PCASuite extends SparkFunSuite with 
MLlibTestSparkContext {
     // check overflowing
     assert(PCAUtil.memoryCost(40000, 60000) > Int.MaxValue)
   }
+
+  test("number of features more than 65535") {
+    val data1 = sc.parallelize(Array(
+      Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray),
+      Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray)
+    ), 2)
+
+    val pca = new PCA(2).fit(data1)
+    // Eigen values should not be negative
+    assert(pca.explainedVariance.values.forall(_ >= 0))
+    // Norm of the principal component should be 1.0
+    assert(Math.sqrt(pca.pc.values.slice(0, 100000)
+      .map(Math.pow(_, 2)).sum) ~== 1.0 relTol 1e-8)
+    // Leading explainedVariance is 1.0
+    assert(pca.explainedVariance(0) ~== 1.0 relTol 1e-12)
+
+    // Leading principal component is '1' vector
+    val firstValue = pca.pc.values(0)
+    pca.pc.values.slice(0, 100000).map(values =>
+      assert(values ~== firstValue relTol 1e-12))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to