Repository: spark Updated Branches: refs/heads/master dd8c179c2 -> d81d95a7e
[SPARK-19368][MLLIB] BlockMatrix.toIndexedRowMatrix() optimization for sparse matrices ## What changes were proposed in this pull request? Optimization [SPARK-12869] was made for dense matrices but caused great performance issue for sparse matrices because manipulating them is very inefficient. When manipulating sparse matrices in Breeze we better use VectorBuilder. ## How was this patch tested? checked it against a use case that we have that after moving to Spark 2 took 6.5 hours instead of 20 mins. After the change it is back to 20 mins again. Closes #16732 from uzadude/SparseVector_optimization. Authored-by: oraviv <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d81d95a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d81d95a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d81d95a7 Branch: refs/heads/master Commit: d81d95a7e8a621e42c9c61305c32df72b6e868be Parents: dd8c179 Author: oraviv <[email protected]> Authored: Thu Nov 22 15:48:01 2018 -0600 Committer: Sean Owen <[email protected]> Committed: Thu Nov 22 15:48:01 2018 -0600 ---------------------------------------------------------------------- .../mllib/linalg/distributed/BlockMatrix.scala | 45 +++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d81d95a7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 7caacd1..e58860f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -17,10 +17,9 @@ package org.apache.spark.mllib.linalg.distributed +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM} import scala.collection.mutable.ArrayBuffer -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV} - import org.apache.spark.{Partitioner, SparkException} import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging @@ -28,6 +27,7 @@ import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel + /** * A grid partitioner, which uses a regular grid to partition coordinates. * @@ -273,24 +273,37 @@ class BlockMatrix @Since("1.3.0") ( require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).") val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) => - mat.rowIter.zipWithIndex.map { + mat.rowIter.zipWithIndex.filter(_._1.size > 0).map { case (vector, rowIdx) => - blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector.asBreeze)) + blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector)) } }.groupByKey().map { case (rowIdx, vectors) => - val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble - - val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz - BSV.zeros[Double](cols) - } else { - BDV.zeros[Double](cols) - } + val numberNonZero = vectors.map(_._2.numActives).sum + val numberNonZeroPerRow = numberNonZero.toDouble / cols.toDouble + + val wholeVector = + if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz + val arrBufferIndices = new ArrayBuffer[Int](numberNonZero) + val arrBufferValues = new ArrayBuffer[Double](numberNonZero) + + vectors.foreach { case (blockColIdx: Int, vec: Vector) => + val offset = colsPerBlock * blockColIdx + vec.foreachActive { case (colIdx: Int, value: Double) => + arrBufferIndices += offset + colIdx + arrBufferValues += value + } + } + Vectors.sparse(cols, arrBufferIndices.toArray, arrBufferValues.toArray) + } else { + val wholeVectorBuf = BDV.zeros[Double](cols) + vectors.foreach { case (blockColIdx: Int, vec: Vector) => + val offset = colsPerBlock * blockColIdx + wholeVectorBuf(offset until Math.min(cols, offset + colsPerBlock)) := vec.asBreeze + } + Vectors.fromBreeze(wholeVectorBuf) + } - vectors.foreach { case (blockColIdx: Int, vec: BV[_]) => - val offset = colsPerBlock * blockColIdx - wholeVector(offset until Math.min(cols, offset + colsPerBlock)) := vec - } - new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector)) + IndexedRow(rowIdx, wholeVector) } new IndexedRowMatrix(rows) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
