Repository: spark
Updated Branches:
  refs/heads/master 1144df3b5 -> 9fdc7a840


[SPARK-26158][MLLIB] fix covariance accuracy problem for DenseVector

## What changes were proposed in this pull request?
Enhance accuracy of the covariance logic in RowMatrix for function 
computeCovariance

## How was this patch tested?
Unit test
Accuracy test

Closes #23126 from KyleLi1985/master.

Authored-by: 李亮 <liang.li.w...@outlook.com>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fdc7a84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fdc7a84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fdc7a84

Branch: refs/heads/master
Commit: 9fdc7a840daa64d1302d12027fd84ea9894110a1
Parents: 1144df3
Author: 李亮 <liang.li.w...@outlook.com>
Authored: Thu Nov 29 13:08:53 2018 -0600
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Thu Nov 29 13:08:53 2018 -0600

----------------------------------------------------------------------
 .../mllib/linalg/distributed/RowMatrix.scala    | 97 +++++++++++++++-----
 .../apache/spark/ml/feature/JavaPCASuite.java   |  3 +-
 .../linalg/distributed/RowMatrixSuite.scala     | 14 +++
 3 files changed, 90 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index c12b751..ff02e5d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -128,6 +128,77 @@ class RowMatrix @Since("1.0.0") (
     RowMatrix.triuToFull(n, GU.data)
   }
 
+  private def computeDenseVectorCovariance(mean: Vector, n: Int, m: Long): 
Matrix = {
+
+    val bc = rows.context.broadcast(mean)
+
+    // Computes n*(n+1)/2, avoiding overflow in the multiplication.
+    // This succeeds when n <= 65535, which is checked above
+    val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
+
+    val MU = rows.treeAggregate(new BDV[Double](nt))(
+      seqOp = (U, v) => {
+
+        val n = v.size
+        val na = Array.ofDim[Double](n)
+        val means = bc.value
+
+        val ta = v.toArray
+        for (index <- 0 until n) {
+          na(index) = ta(index) - means(index)
+        }
+
+        BLAS.spr(1.0, new DenseVector(na), U.data)
+        U
+      }, combOp = (U1, U2) => U1 += U2)
+
+    bc.destroy()
+
+    val M = RowMatrix.triuToFull(n, MU.data).asBreeze
+
+    var i = 0
+    var j = 0
+    val m1 = m - 1.0
+    while (i < n) {
+      j = i
+      while (j < n) {
+        val Mij = M(i, j) / m1
+        M(i, j) = Mij
+        M(j, i) = Mij
+        j += 1
+      }
+      i += 1
+    }
+
+    Matrices.fromBreeze(M)
+  }
+
+  private def computeSparseVectorCovariance(mean: Vector, n: Int, m: Long): 
Matrix = {
+
+    // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not 
accurate if E[X * Y] is
+    // large but Cov(X, Y) is small, but it is good for sparse computation.
+    // TODO: find a fast and stable way for sparse data.
+    val G = computeGramianMatrix().asBreeze
+
+    var i = 0
+    var j = 0
+    val m1 = m - 1.0
+    var alpha = 0.0
+    while (i < n) {
+      alpha = m / m1 * mean(i)
+      j = i
+      while (j < n) {
+        val Gij = G(i, j) / m1 - alpha * mean(j)
+        G(i, j) = Gij
+        G(j, i) = Gij
+        j += 1
+      }
+      i += 1
+    }
+
+    Matrices.fromBreeze(G)
+  }
+
   private def checkNumColumns(cols: Int): Unit = {
     if (cols > 65535) {
       throw new IllegalArgumentException(s"Argument with more than 65535 cols: 
$cols")
@@ -337,29 +408,11 @@ class RowMatrix @Since("1.0.0") (
       "  Cannot compute the covariance of a RowMatrix with <= 1 row.")
     val mean = summary.mean
 
-    // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not 
accurate if E[X * Y] is
-    // large but Cov(X, Y) is small, but it is good for sparse computation.
-    // TODO: find a fast and stable way for sparse data.
-
-    val G = computeGramianMatrix().asBreeze
-
-    var i = 0
-    var j = 0
-    val m1 = m - 1.0
-    var alpha = 0.0
-    while (i < n) {
-      alpha = m / m1 * mean(i)
-      j = i
-      while (j < n) {
-        val Gij = G(i, j) / m1 - alpha * mean(j)
-        G(i, j) = Gij
-        G(j, i) = Gij
-        j += 1
-      }
-      i += 1
+    if (rows.first().isInstanceOf[DenseVector]) {
+      computeDenseVectorCovariance(mean, n, m)
+    } else {
+      computeSparseVectorCovariance(mean, n, m)
     }
-
-    Matrices.fromBreeze(G)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java 
b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
index 683ceff..2e177ed 100644
--- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
@@ -28,7 +28,6 @@ import org.apache.spark.SharedSparkSession;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.Vectors;
-import org.apache.spark.mllib.linalg.DenseVector;
 import org.apache.spark.mllib.linalg.Matrix;
 import org.apache.spark.mllib.linalg.distributed.RowMatrix;
 import org.apache.spark.sql.Dataset;
@@ -67,7 +66,7 @@ public class JavaPCASuite extends SharedSparkSession {
     JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);
 
     RowMatrix mat = new RowMatrix(dataRDD.map(
-        (Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new 
DenseVector(vector.toArray())
+        (Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector)
     ).rdd());
 
     Matrix pc = mat.computePrincipalComponents(3);

http://git-wip-us.apache.org/repos/asf/spark/blob/9fdc7a84/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 7c9e14f..a4ca4f0 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -266,6 +266,20 @@ class RowMatrixSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     }
   }
 
+  test("dense vector covariance accuracy (SPARK-26158)") {
+    val denseData = Seq(
+      Vectors.dense(100000.000004, 199999.999999),
+      Vectors.dense(100000.000012, 200000.000002),
+      Vectors.dense(99999.9999931, 200000.000003),
+      Vectors.dense(99999.9999977, 200000.000001)
+    )
+    val denseMat = new RowMatrix(sc.parallelize(denseData, 2))
+
+    val result = denseMat.computeCovariance()
+    val expected = breeze.linalg.cov(denseMat.toBreeze())
+    assert(closeToZero(abs(expected) - 
abs(result.asBreeze.asInstanceOf[BDM[Double]])))
+  }
+
   test("compute covariance") {
     for (mat <- Seq(denseMat, sparseMat)) {
       val result = mat.computeCovariance()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to