Repository: spark Updated Branches: refs/heads/master 327456b48 -> ce40efa20
[SPARK-25790][MLLIB] PCA: Support more than 65535 column matrix ## What changes were proposed in this pull request? Spark PCA supports maximum only ~65,535 columns matrix. This is due to the fact that, it computes the Covariance matrix first, then compute principle components. The main bottle neck was computing **covariance matrix.** The limit 65,500 came due to the integer size limit. Because we are passing an array of size n*(n+1)/2 to the breeze library and the size cannot be more than INT_MAX. so, the maximum column size we can give is 65,500. Currently we don't have such limitation for computing SVD in spark. So, we can make use of Spark SVD to compute the PCA, if the number of columns exceeds the limit. Computation of PCA can be done directly using SVD of matrix, instead of finding the covariance matrix. Following are the papers/links for the reference. https://arxiv.org/pdf/1404.1100.pdf https://en.wikipedia.org/wiki/Principal_component_analysis#Singular_value_decomposition http://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-16-17/Web-Mining-Agents/PCA-SVD.pdf ## How was this patch tested? added UT, also manually verified with the existing test for pca, by removing the limit condition in the fit method. Closes #22784 from shahidki31/PCA. Authored-by: Shahid <shahidk...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce40efa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce40efa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce40efa2 Branch: refs/heads/master Commit: ce40efa200e6cb6f10a289e2ab00f711b0ebd379 Parents: 327456b Author: Shahid <shahidk...@gmail.com> Authored: Tue Oct 30 08:39:30 2018 -0500 Committer: Sean Owen <sean.o...@databricks.com> Committed: Tue Oct 30 08:39:30 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/mllib/feature/PCA.scala | 20 +++++++++--- .../mllib/linalg/distributed/RowMatrix.scala | 33 +++++++++++++------- .../apache/spark/mllib/feature/PCASuite.scala | 23 +++++++++++++- 3 files changed, 58 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala index a01503f..2fc517c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -21,6 +21,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD /** @@ -44,12 +45,21 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { require(k <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") - require(PCAUtil.memoryCost(k, numFeatures) < Int.MaxValue, - "The param k and numFeatures is too large for SVD computation. " + - "Try reducing the parameter k for PCA, or reduce the input feature " + - "vector dimension to make this tractable.") + val mat = if (numFeatures > 65535) { + val meanVector = Statistics.colStats(sources).mean.asBreeze + val meanCentredRdd = sources.map { rowVector => + Vectors.fromBreeze(rowVector.asBreeze - meanVector) + } + new RowMatrix(meanCentredRdd) + } else { + require(PCAUtil.memoryCost(k, numFeatures) < Int.MaxValue, + "The param k and numFeatures is too large for SVD computation. " + + "Try reducing the parameter k for PCA, or reduce the input feature " + + "vector dimension to make this tractable.") + + new RowMatrix(sources) + } - val mat = new RowMatrix(sources) val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) val densePC = pc match { case dm: DenseMatrix => http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 78a8810..82ab716 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -370,32 +370,41 @@ class RowMatrix @Since("1.0.0") ( * Each column corresponds for one principal component, * and the columns are in descending order of component variance. * The row data do not need to be "centered" first; it is not necessary for - * the mean of each column to be 0. + * the mean of each column to be 0. But, if the number of columns are more than + * 65535, then the data need to be "centered". * * @param k number of top principal components. * @return a matrix of size n-by-k, whose columns are principal components, and * a vector of values which indicate how much variance each principal component * explains - * - * @note This cannot be computed on matrices with more than 65535 columns. */ @Since("1.6.0") def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = { val n = numCols().toInt require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]") - val Cov = computeCovariance().asBreeze.asInstanceOf[BDM[Double]] + if (n > 65535) { + val svd = computeSVD(k) + val s = svd.s.toArray.map(eigValue => eigValue * eigValue / (n - 1)) + val eigenSum = s.sum + val explainedVariance = s.map(_ / eigenSum) - val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov) + (svd.V, Vectors.dense(explainedVariance)) + } else { - val eigenSum = s.data.sum - val explainedVariance = s.data.map(_ / eigenSum) + val Cov = computeCovariance().asBreeze.asInstanceOf[BDM[Double]] - if (k == n) { - (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance)) - } else { - (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)), - Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k))) + val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov) + + val eigenSum = s.data.sum + val explainedVariance = s.data.map(_ / eigenSum) + + if (k == n) { + (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance)) + } else { + (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)), + Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k))) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/ce40efa2/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala index 8eab124..fe49162 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.feature import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -54,4 +54,25 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext { // check overflowing assert(PCAUtil.memoryCost(40000, 60000) > Int.MaxValue) } + + test("number of features more than 65535") { + val data1 = sc.parallelize(Array( + Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray), + Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray) + ), 2) + + val pca = new PCA(2).fit(data1) + // Eigen values should not be negative + assert(pca.explainedVariance.values.forall(_ >= 0)) + // Norm of the principal component should be 1.0 + assert(Math.sqrt(pca.pc.values.slice(0, 100000) + .map(Math.pow(_, 2)).sum) ~== 1.0 relTol 1e-8) + // Leading explainedVariance is 1.0 + assert(pca.explainedVariance(0) ~== 1.0 relTol 1e-12) + + // Leading principal component is '1' vector + val firstValue = pca.pc.values(0) + pca.pc.values.slice(0, 100000).map(values => + assert(values ~== firstValue relTol 1e-12)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org