Repository: spark
Updated Branches:
  refs/heads/master 5b9760de8 -> a63be1a18


[SPARK-3977] Conversion methods for BlockMatrix to other Distributed Matrices

The conversion methods for `BlockMatrix`. Conversions go through 
`CoordinateMatrix` in order to cause a shuffle so that intermediate operations 
will be stored on disk and the expensive initial computation will be mitigated.

Author: Burak Yavuz <[email protected]>

Closes #4256 from brkyvz/SPARK-3977PR and squashes the following commits:

4df37fe [Burak Yavuz] moved TODO inside code block
b049c07 [Burak Yavuz] addressed code review feedback v1
66cb755 [Burak Yavuz] added default toBlockMatrix conversion
851f2a2 [Burak Yavuz] added better comments and checks
cdb9895 [Burak Yavuz] [SPARK-3977] Conversion methods for BlockMatrix to other 
Distributed Matrices


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a63be1a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a63be1a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a63be1a1

Branch: refs/heads/master
Commit: a63be1a18f7b7d77f7deef2abc9a5be6ad24ae28
Parents: 5b9760d
Author: Burak Yavuz <[email protected]>
Authored: Wed Jan 28 23:42:07 2015 -0800
Committer: Xiangrui Meng <[email protected]>
Committed: Wed Jan 28 23:42:07 2015 -0800

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 26 +++++++++++-
 .../linalg/distributed/CoordinateMatrix.scala   | 43 +++++++++++++++++++-
 .../linalg/distributed/IndexedRowMatrix.scala   | 18 ++++++++
 .../linalg/distributed/BlockMatrixSuite.scala   | 14 +++++++
 .../distributed/CoordinateMatrixSuite.scala     | 14 +++++++
 .../distributed/IndexedRowMatrixSuite.scala     | 15 +++++++
 6 files changed, 127 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 0ab74ba..426dbf4 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.mllib.linalg.distributed
 
+import scala.collection.mutable.ArrayBuffer
+
 import breeze.linalg.{DenseMatrix => BDM}
 
 import org.apache.spark.{Logging, Partitioner}
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
+import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -182,6 +184,28 @@ class BlockMatrix(
     this
   }
 
+  /** Converts to CoordinateMatrix. */
+  def toCoordinateMatrix(): CoordinateMatrix = {
+    val entryRDD = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) 
=>
+      val rowStart = blockRowIndex.toLong * rowsPerBlock
+      val colStart = blockColIndex.toLong * colsPerBlock
+      val entryValues = new ArrayBuffer[MatrixEntry]()
+      mat.foreachActive { (i, j, v) =>
+        if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, 
colStart + j, v))
+      }
+      entryValues
+    }
+    new CoordinateMatrix(entryRDD, numRows(), numCols())
+  }
+
+  /** Converts to IndexedRowMatrix. The number of columns must be within the 
integer range. */
+  def toIndexedRowMatrix(): IndexedRowMatrix = {
+    require(numCols() < Int.MaxValue, "The number of columns must be within 
the integer range. " +
+      s"numCols: ${numCols()}")
+    // TODO: This implementation may be optimized
+    toCoordinateMatrix().toIndexedRowMatrix()
+  }
+
   /** Collect the distributed matrix on the driver as a `DenseMatrix`. */
   def toLocalMatrix(): Matrix = {
     require(numRows() < Int.MaxValue, "The number of rows of this matrix 
should be less than " +

http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
index b60559c..078d1fa 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -21,8 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Matrix, SparseMatrix, Vectors}
 
 /**
  * :: Experimental ::
@@ -98,6 +97,46 @@ class CoordinateMatrix(
     toIndexedRowMatrix().toRowMatrix()
   }
 
+  /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 
1024 x 1024. */
+  def toBlockMatrix(): BlockMatrix = {
+    toBlockMatrix(1024, 1024)
+  }
+
+  /**
+   * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
+   * @param rowsPerBlock The number of rows of each block. The blocks at the 
bottom edge may have
+   *                     a smaller value. Must be an integer value greater 
than 0.
+   * @param colsPerBlock The number of columns of each block. The blocks at 
the right edge may have
+   *                     a smaller value. Must be an integer value greater 
than 0.
+   * @return a [[BlockMatrix]]
+   */
+  def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
+    require(rowsPerBlock > 0,
+      s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
+    require(colsPerBlock > 0,
+      s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")
+    val m = numRows()
+    val n = numCols()
+    val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
+    val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
+    val partitioner = GridPartitioner(numRowBlocks, numColBlocks, 
entries.partitions.length)
+
+    val blocks: RDD[((Int, Int), Matrix)] = entries.map { entry =>
+      val blockRowIndex = (entry.i / rowsPerBlock).toInt
+      val blockColIndex = (entry.j / colsPerBlock).toInt
+
+      val rowId = entry.i % rowsPerBlock
+      val colId = entry.j % colsPerBlock
+
+      ((blockRowIndex, blockColIndex), (rowId.toInt, colId.toInt, entry.value))
+    }.groupByKey(partitioner).map { case ((blockRowIndex, blockColIndex), 
entry) =>
+      val effRows = math.min(m - blockRowIndex.toLong * rowsPerBlock, 
rowsPerBlock).toInt
+      val effCols = math.min(n - blockColIndex.toLong * colsPerBlock, 
colsPerBlock).toInt
+      ((blockRowIndex, blockColIndex), SparseMatrix.fromCOO(effRows, effCols, 
entry))
+    }
+    new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n)
+  }
+
   /** Determines the size by computing the max row/column index. */
   private def computeSize() {
     // Reduce will throw an exception if `entries` is empty.

http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index c518271..3be530f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -75,6 +75,24 @@ class IndexedRowMatrix(
     new RowMatrix(rows.map(_.vector), 0L, nCols)
   }
 
+  /** Converts to BlockMatrix. Creates blocks of [[SparseMatrix]] with size 
1024 x 1024. */
+  def toBlockMatrix(): BlockMatrix = {
+    toBlockMatrix(1024, 1024)
+  }
+
+  /**
+   * Converts to BlockMatrix. Creates blocks of [[SparseMatrix]].
+   * @param rowsPerBlock The number of rows of each block. The blocks at the 
bottom edge may have
+   *                     a smaller value. Must be an integer value greater 
than 0.
+   * @param colsPerBlock The number of columns of each block. The blocks at 
the right edge may have
+   *                     a smaller value. Must be an integer value greater 
than 0.
+   * @return a [[BlockMatrix]]
+   */
+  def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
+    // TODO: This implementation may be optimized
+    toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
+  }
+
   /**
    * Converts this matrix to a
    * [[org.apache.spark.mllib.linalg.distributed.CoordinateMatrix]].

http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 05efbc8..7284d03 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -120,6 +120,20 @@ class BlockMatrixSuite extends FunSuite with 
MLlibTestSparkContext {
     }
   }
 
+  test("toCoordinateMatrix") {
+    val coordMat = gridBasedMat.toCoordinateMatrix()
+    assert(coordMat.numRows() === m)
+    assert(coordMat.numCols() === n)
+    assert(coordMat.toBreeze() === gridBasedMat.toBreeze())
+  }
+
+  test("toIndexedRowMatrix") {
+    val rowMat = gridBasedMat.toIndexedRowMatrix()
+    assert(rowMat.numRows() === m)
+    assert(rowMat.numCols() === n)
+    assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
+  }
+
   test("toBreeze and toLocalMatrix") {
     val expected = BDM(
       (1.0, 0.0, 0.0, 0.0),

http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
index 80bef81..04b36a9 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala
@@ -100,4 +100,18 @@ class CoordinateMatrixSuite extends FunSuite with 
MLlibTestSparkContext {
       Vectors.dense(0.0, 9.0, 0.0, 0.0))
     assert(rows === expected)
   }
+
+  test("toBlockMatrix") {
+    val blockMat = mat.toBlockMatrix(2, 2)
+    assert(blockMat.numRows() === m)
+    assert(blockMat.numCols() === n)
+    assert(blockMat.toBreeze() === mat.toBreeze())
+
+    intercept[IllegalArgumentException] {
+      mat.toBlockMatrix(-1, 2)
+    }
+    intercept[IllegalArgumentException] {
+      mat.toBlockMatrix(2, 0)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a63be1a1/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
index b86c2ca..2ab53cc 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala
@@ -88,6 +88,21 @@ class IndexedRowMatrixSuite extends FunSuite with 
MLlibTestSparkContext {
     assert(coordMat.toBreeze() === idxRowMat.toBreeze())
   }
 
+  test("toBlockMatrix") {
+    val idxRowMat = new IndexedRowMatrix(indexedRows)
+    val blockMat = idxRowMat.toBlockMatrix(2, 2)
+    assert(blockMat.numRows() === m)
+    assert(blockMat.numCols() === n)
+    assert(blockMat.toBreeze() === idxRowMat.toBreeze())
+
+    intercept[IllegalArgumentException] {
+      idxRowMat.toBlockMatrix(-1, 2)
+    }
+    intercept[IllegalArgumentException] {
+      idxRowMat.toBlockMatrix(2, 0)
+    }
+  }
+
   test("multiply a local matrix") {
     val A = new IndexedRowMatrix(indexedRows)
     val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to