Repository: spark
Updated Branches:
  refs/heads/master dd8c179c2 -> d81d95a7e


[SPARK-19368][MLLIB] BlockMatrix.toIndexedRowMatrix() optimization for sparse 
matrices

## What changes were proposed in this pull request?

Optimization [SPARK-12869] was made for dense matrices but caused great 
performance issue for sparse matrices because manipulating them is very 
inefficient. When manipulating sparse matrices in Breeze we better use 
VectorBuilder.

## How was this patch tested?

checked it against a use case that we have that after moving to Spark 2 took 
6.5 hours instead of 20 mins. After the change it is back to 20 mins again.

Closes #16732 from uzadude/SparseVector_optimization.

Authored-by: oraviv <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d81d95a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d81d95a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d81d95a7

Branch: refs/heads/master
Commit: d81d95a7e8a621e42c9c61305c32df72b6e868be
Parents: dd8c179
Author: oraviv <[email protected]>
Authored: Thu Nov 22 15:48:01 2018 -0600
Committer: Sean Owen <[email protected]>
Committed: Thu Nov 22 15:48:01 2018 -0600

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 45 +++++++++++++-------
 1 file changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d81d95a7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 7caacd1..e58860f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.mllib.linalg.distributed
 
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM}
 import scala.collection.mutable.ArrayBuffer
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, 
SparseVector => BSV, Vector => BV}
-
 import org.apache.spark.{Partitioner, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
@@ -28,6 +27,7 @@ import org.apache.spark.mllib.linalg._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
+
 /**
  * A grid partitioner, which uses a regular grid to partition coordinates.
  *
@@ -273,24 +273,37 @@ class BlockMatrix @Since("1.3.0") (
     require(cols < Int.MaxValue, s"The number of columns should be less than 
Int.MaxValue ($cols).")
 
     val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
-      mat.rowIter.zipWithIndex.map {
+      mat.rowIter.zipWithIndex.filter(_._1.size > 0).map {
         case (vector, rowIdx) =>
-          blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, 
vector.asBreeze))
+          blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector))
       }
     }.groupByKey().map { case (rowIdx, vectors) =>
-      val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / 
cols.toDouble
-
-      val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th 
nnz
-        BSV.zeros[Double](cols)
-      } else {
-        BDV.zeros[Double](cols)
-      }
+      val numberNonZero = vectors.map(_._2.numActives).sum
+      val numberNonZeroPerRow = numberNonZero.toDouble / cols.toDouble
+
+      val wholeVector =
+        if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
+          val arrBufferIndices = new ArrayBuffer[Int](numberNonZero)
+          val arrBufferValues = new ArrayBuffer[Double](numberNonZero)
+
+          vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
+              val offset = colsPerBlock * blockColIdx
+              vec.foreachActive { case (colIdx: Int, value: Double) =>
+                arrBufferIndices += offset + colIdx
+                arrBufferValues  += value
+              }
+          }
+          Vectors.sparse(cols, arrBufferIndices.toArray, 
arrBufferValues.toArray)
+        } else {
+          val wholeVectorBuf = BDV.zeros[Double](cols)
+          vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
+            val offset = colsPerBlock * blockColIdx
+            wholeVectorBuf(offset until Math.min(cols, offset + colsPerBlock)) 
:= vec.asBreeze
+          }
+          Vectors.fromBreeze(wholeVectorBuf)
+        }
 
-      vectors.foreach { case (blockColIdx: Int, vec: BV[_]) =>
-        val offset = colsPerBlock * blockColIdx
-        wholeVector(offset until Math.min(cols, offset + colsPerBlock)) := vec
-      }
-      new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
+      IndexedRow(rowIdx, wholeVector)
     }
     new IndexedRowMatrix(rows)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to