Repository: spark Updated Branches: refs/heads/master 1144df3b5 -> 9fdc7a840
[SPARK-26158][MLLIB] fix covariance accuracy problem for DenseVector ## What changes were proposed in this pull request? Enhance accuracy of the covariance logic in RowMatrix for function computeCovariance ## How was this patch tested? Unit test Accuracy test Closes #23126 from KyleLi1985/master. Authored-by: æ亮 <liang.li.w...@outlook.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fdc7a84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fdc7a84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fdc7a84 Branch: refs/heads/master Commit: 9fdc7a840daa64d1302d12027fd84ea9894110a1 Parents: 1144df3 Author: æ亮 <liang.li.w...@outlook.com> Authored: Thu Nov 29 13:08:53 2018 -0600 Committer: Sean Owen <sean.o...@databricks.com> Committed: Thu Nov 29 13:08:53 2018 -0600 ---------------------------------------------------------------------- .../mllib/linalg/distributed/RowMatrix.scala | 97 +++++++++++++++----- .../apache/spark/ml/feature/JavaPCASuite.java | 3 +- .../linalg/distributed/RowMatrixSuite.scala | 14 +++ 3 files changed, 90 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index c12b751..ff02e5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -128,6 +128,77 @@ class RowMatrix @Since("1.0.0") ( RowMatrix.triuToFull(n, GU.data) } + private def computeDenseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = { + + val bc = rows.context.broadcast(mean) + + // Computes n*(n+1)/2, avoiding overflow in the multiplication. + // This succeeds when n <= 65535, which is checked above + val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2)) + + val MU = rows.treeAggregate(new BDV[Double](nt))( + seqOp = (U, v) => { + + val n = v.size + val na = Array.ofDim[Double](n) + val means = bc.value + + val ta = v.toArray + for (index <- 0 until n) { + na(index) = ta(index) - means(index) + } + + BLAS.spr(1.0, new DenseVector(na), U.data) + U + }, combOp = (U1, U2) => U1 += U2) + + bc.destroy() + + val M = RowMatrix.triuToFull(n, MU.data).asBreeze + + var i = 0 + var j = 0 + val m1 = m - 1.0 + while (i < n) { + j = i + while (j < n) { + val Mij = M(i, j) / m1 + M(i, j) = Mij + M(j, i) = Mij + j += 1 + } + i += 1 + } + + Matrices.fromBreeze(M) + } + + private def computeSparseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = { + + // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is + // large but Cov(X, Y) is small, but it is good for sparse computation. + // TODO: find a fast and stable way for sparse data. + val G = computeGramianMatrix().asBreeze + + var i = 0 + var j = 0 + val m1 = m - 1.0 + var alpha = 0.0 + while (i < n) { + alpha = m / m1 * mean(i) + j = i + while (j < n) { + val Gij = G(i, j) / m1 - alpha * mean(j) + G(i, j) = Gij + G(j, i) = Gij + j += 1 + } + i += 1 + } + + Matrices.fromBreeze(G) + } + private def checkNumColumns(cols: Int): Unit = { if (cols > 65535) { throw new IllegalArgumentException(s"Argument with more than 65535 cols: $cols") @@ -337,29 +408,11 @@ class RowMatrix @Since("1.0.0") ( " Cannot compute the covariance of a RowMatrix with <= 1 row.") val mean = summary.mean - // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is - // large but Cov(X, Y) is small, but it is good for sparse computation. - // TODO: find a fast and stable way for sparse data. - - val G = computeGramianMatrix().asBreeze - - var i = 0 - var j = 0 - val m1 = m - 1.0 - var alpha = 0.0 - while (i < n) { - alpha = m / m1 * mean(i) - j = i - while (j < n) { - val Gij = G(i, j) / m1 - alpha * mean(j) - G(i, j) = Gij - G(j, i) = Gij - j += 1 - } - i += 1 + if (rows.first().isInstanceOf[DenseVector]) { + computeDenseVectorCovariance(mean, n, m) + } else { + computeSparseVectorCovariance(mean, n, m) } - - Matrices.fromBreeze(G) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java index 683ceff..2e177ed 100644 --- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java @@ -28,7 +28,6 @@ import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.Vectors; -import org.apache.spark.mllib.linalg.DenseVector; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.distributed.RowMatrix; import org.apache.spark.sql.Dataset; @@ -67,7 +66,7 @@ public class JavaPCASuite extends SharedSparkSession { JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2); RowMatrix mat = new RowMatrix(dataRDD.map( - (Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new DenseVector(vector.toArray()) + (Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector) ).rdd()); Matrix pc = mat.computePrincipalComponents(3); http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 7c9e14f..a4ca4f0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -266,6 +266,20 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("dense vector covariance accuracy (SPARK-26158)") { + val denseData = Seq( + Vectors.dense(100000.000004, 199999.999999), + Vectors.dense(100000.000012, 200000.000002), + Vectors.dense(99999.9999931, 200000.000003), + Vectors.dense(99999.9999977, 200000.000001) + ) + val denseMat = new RowMatrix(sc.parallelize(denseData, 2)) + + val result = denseMat.computeCovariance() + val expected = breeze.linalg.cov(denseMat.toBreeze()) + assert(closeToZero(abs(expected) - abs(result.asBreeze.asInstanceOf[BDM[Double]]))) + } + test("compute covariance") { for (mat <- Seq(denseMat, sparseMat)) { val result = mat.computeCovariance() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org