Repository: spark Updated Branches: refs/heads/branch-1.5 350006497 -> eedb996dd
[SPARK-6486] [MLLIB] [PYTHON] Add BlockMatrix to PySpark. mengxr This adds the `BlockMatrix` to PySpark. I have the conversions to `IndexedRowMatrix` and `CoordinateMatrix` ready as well, so once PR #7554 is completed (which relies on PR #7746), this PR can be finished. Author: Mike Dusenberry <mwdus...@us.ibm.com> Closes #7761 from dusenberrymw/SPARK-6486_Add_BlockMatrix_to_PySpark and squashes the following commits: 27195c2 [Mike Dusenberry] Adding one more check to _convert_to_matrix_block_tuple, and a few minor documentation changes. ae50883 [Mike Dusenberry] Minor update: BlockMatrix should inherit from DistributedMatrix. b8acc1c [Mike Dusenberry] Moving BlockMatrix to pyspark.mllib.linalg.distributed, updating the logic to match that of the other distributed matrices, adding conversions, and adding documentation. c014002 [Mike Dusenberry] Using properties for better documentation. 3bda6ab [Mike Dusenberry] Adding documentation. 8fb3095 [Mike Dusenberry] Small cleanup. e17af2e [Mike Dusenberry] Adding BlockMatrix to PySpark. (cherry picked from commit 34dcf10104460816382908b2b8eeb6c925e862bf) Signed-off-by: Xiangrui Meng <m...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eedb996d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eedb996d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eedb996d Branch: refs/heads/branch-1.5 Commit: eedb996dde5593a97bcb61b3b1515e6fdea6aa70 Parents: 3500064 Author: Mike Dusenberry <mwdus...@us.ibm.com> Authored: Wed Aug 5 07:40:50 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Wed Aug 5 07:42:25 2015 -0700 ---------------------------------------------------------------------- docs/mllib-data-types.md | 41 +++ .../spark/mllib/api/python/PythonMLLibAPI.scala | 25 ++ python/pyspark/mllib/linalg/distributed.py | 328 ++++++++++++++++++- 3 files changed, 388 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/docs/mllib-data-types.md ---------------------------------------------------------------------- diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index 11033bf..f0e8d54 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -494,6 +494,9 @@ rowMat = mat.toRowMatrix() # Convert to a CoordinateMatrix. coordinateMat = mat.toCoordinateMatrix() + +# Convert to a BlockMatrix. +blockMat = mat.toBlockMatrix() {% endhighlight %} </div> @@ -594,6 +597,9 @@ rowMat = mat.toRowMatrix() # Convert to an IndexedRowMatrix. indexedRowMat = mat.toIndexedRowMatrix() + +# Convert to a BlockMatrix. +blockMat = mat.toBlockMatrix() {% endhighlight %} </div> @@ -661,4 +667,39 @@ matA.validate(); BlockMatrix ata = matA.transpose().multiply(matA); {% endhighlight %} </div> + +<div data-lang="python" markdown="1"> + +A [`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix) +can be created from an `RDD` of sub-matrix blocks, where a sub-matrix block is a +`((blockRowIndex, blockColIndex), sub-matrix)` tuple. + +{% highlight python %} +from pyspark.mllib.linalg import Matrices +from pyspark.mllib.linalg.distributed import BlockMatrix + +# Create an RDD of sub-matrix blocks. +blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + +# Create a BlockMatrix from an RDD of sub-matrix blocks. +mat = BlockMatrix(blocks, 3, 2) + +# Get its size. +m = mat.numRows() # 6 +n = mat.numCols() # 2 + +# Get the blocks as an RDD of sub-matrix blocks. +blocksRDD = mat.blocks + +# Convert to a LocalMatrix. +localMat = mat.toLocalMatrix() + +# Convert to an IndexedRowMatrix. +indexedRowMat = mat.toIndexedRowMatrix() + +# Convert to a CoordinateMatrix. +coordinateMat = mat.toCoordinateMatrix() +{% endhighlight %} +</div> </div> http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d2b3fae..f585aac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1129,6 +1129,21 @@ private[python] class PythonMLLibAPI extends Serializable { } /** + * Wrapper around BlockMatrix constructor. + */ + def createBlockMatrix(blocks: DataFrame, rowsPerBlock: Int, colsPerBlock: Int, + numRows: Long, numCols: Long): BlockMatrix = { + // We use DataFrames for serialization of sub-matrix blocks from + // Python, so map each Row in the DataFrame back to a + // ((blockRowIndex, blockColIndex), sub-matrix) tuple. + val blockTuples = blocks.map { + case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) => + ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix) + } + new BlockMatrix(blockTuples, rowsPerBlock, colsPerBlock, numRows, numCols) + } + + /** * Return the rows of an IndexedRowMatrix. */ def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = { @@ -1147,6 +1162,16 @@ private[python] class PythonMLLibAPI extends Serializable { val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) sqlContext.createDataFrame(coordinateMatrix.entries) } + + /** + * Return the sub-matrix blocks of a BlockMatrix. + */ + def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = { + // We use DataFrames for serialization of sub-matrix blocks to + // Python, so return a DataFrame. + val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext) + sqlContext.createDataFrame(blockMatrix.blocks) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/python/pyspark/mllib/linalg/distributed.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index 666d833..aec407d 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -28,11 +28,12 @@ from py4j.java_gateway import JavaObject from pyspark import RDD from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import _convert_to_vector +from pyspark.mllib.linalg import _convert_to_vector, Matrix __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', - 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix'] + 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix', + 'BlockMatrix'] class DistributedMatrix(object): @@ -322,6 +323,35 @@ class IndexedRowMatrix(DistributedMatrix): java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") return CoordinateMatrix(java_coordinate_matrix) + def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): + """ + Convert this matrix to a BlockMatrix. + + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(6, [4, 5, 6])]) + >>> mat = IndexedRowMatrix(rows).toBlockMatrix() + + >>> # This IndexedRowMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, and the ensuing + >>> # BlockMatrix will have 7 rows as well. + >>> print(mat.numRows()) + 7 + + >>> print(mat.numCols()) + 3 + """ + java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix", + rowsPerBlock, + colsPerBlock) + return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock) + class MatrixEntry(object): """ @@ -476,19 +506,18 @@ class CoordinateMatrix(DistributedMatrix): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, but the ensuing RowMatrix >>> # will only have 2 rows since there are only entries on 2 >>> # unique rows. - >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numRows()) 2 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing RowMatrix >>> # will have 5 columns as well. - >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numCols()) 5 """ @@ -501,33 +530,320 @@ class CoordinateMatrix(DistributedMatrix): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, and the ensuing >>> # IndexedRowMatrix will have 7 rows as well. - >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numRows()) 7 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing >>> # IndexedRowMatrix will have 5 columns as well. - >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numCols()) 5 """ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") return IndexedRowMatrix(java_indexed_row_matrix) + def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): + """ + Convert this matrix to a BlockMatrix. + + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toBlockMatrix() + + >>> # This CoordinateMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, and the ensuing + >>> # BlockMatrix will have 7 rows as well. + >>> print(mat.numRows()) + 7 + + >>> # This CoordinateMatrix will have 5 columns, due to the + >>> # highest column index being 4, and the ensuing + >>> # BlockMatrix will have 5 columns as well. + >>> print(mat.numCols()) + 5 + """ + java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix", + rowsPerBlock, + colsPerBlock) + return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock) + + +def _convert_to_matrix_block_tuple(block): + if (isinstance(block, tuple) and len(block) == 2 + and isinstance(block[0], tuple) and len(block[0]) == 2 + and isinstance(block[1], Matrix)): + blockRowIndex = int(block[0][0]) + blockColIndex = int(block[0][1]) + subMatrix = block[1] + return ((blockRowIndex, blockColIndex), subMatrix) + else: + raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) + + +class BlockMatrix(DistributedMatrix): + """ + .. note:: Experimental + + Represents a distributed matrix in blocks of local matrices. + + :param blocks: An RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that + form this distributed matrix. If multiple blocks + with the same index exist, the results for + operations like add and multiply will be + unpredictable. + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + :param numRows: Number of rows of this matrix. If the supplied + value is less than or equal to zero, the number + of rows will be calculated when `numRows` is + invoked. + :param numCols: Number of columns of this matrix. If the supplied + value is less than or equal to zero, the number + of columns will be calculated when `numCols` is + invoked. + """ + def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): + """ + Note: This docstring is not shown publicly. + + Create a wrapper over a Java BlockMatrix. + + Publicly, we require that `blocks` be an RDD. However, for + internal usage, `blocks` can also be a Java BlockMatrix + object, in which case we can wrap it directly. This + assists in clean matrix conversions. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + + >>> mat_diff = BlockMatrix(blocks, 3, 2) + >>> (mat_diff._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + False + + >>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2) + >>> (mat_same._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + True + """ + if isinstance(blocks, RDD): + blocks = blocks.map(_convert_to_matrix_block_tuple) + # We use DataFrames for serialization of sub-matrix blocks + # from Python, so first convert the RDD to a DataFrame on + # this side. This will convert each sub-matrix block + # tuple to a Row containing the 'blockRowIndex', + # 'blockColIndex', and 'subMatrix' values, which can + # each be easily serialized. We will convert back to + # ((blockRowIndex, blockColIndex), sub-matrix) tuples on + # the Scala side. + java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(), + int(rowsPerBlock), int(colsPerBlock), + long(numRows), long(numCols)) + elif (isinstance(blocks, JavaObject) + and blocks.getClass().getSimpleName() == "BlockMatrix"): + java_matrix = blocks + else: + raise TypeError("blocks should be an RDD of sub-matrix blocks as " + "((int, int), matrix) tuples, got %s" % type(blocks)) + + self._java_matrix_wrapper = JavaModelWrapper(java_matrix) + + @property + def blocks(self): + """ + The RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that form this + distributed matrix. + + >>> mat = BlockMatrix( + ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2) + >>> blocks = mat.blocks + >>> blocks.first() + ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)) + + """ + # We use DataFrames for serialization of sub-matrix blocks + # from Java, so we first convert the RDD of blocks to a + # DataFrame on the Scala/Java side. Then we map each Row in + # the DataFrame back to a sub-matrix block on this side. + blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model) + blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1])) + return blocks + + @property + def rowsPerBlock(self): + """ + Number of rows that make up each block. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.rowsPerBlock + 3 + """ + return self._java_matrix_wrapper.call("rowsPerBlock") + + @property + def colsPerBlock(self): + """ + Number of columns that make up each block. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.colsPerBlock + 2 + """ + return self._java_matrix_wrapper.call("colsPerBlock") + + @property + def numRowBlocks(self): + """ + Number of rows of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.numRowBlocks + 2 + """ + return self._java_matrix_wrapper.call("numRowBlocks") + + @property + def numColBlocks(self): + """ + Number of columns of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.numColBlocks + 1 + """ + return self._java_matrix_wrapper.call("numColBlocks") + + def numRows(self): + """ + Get or compute the number of rows. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> mat = BlockMatrix(blocks, 3, 2) + >>> print(mat.numRows()) + 6 + + >>> mat = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(mat.numRows()) + 7 + """ + return self._java_matrix_wrapper.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> mat = BlockMatrix(blocks, 3, 2) + >>> print(mat.numCols()) + 2 + + >>> mat = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(mat.numCols()) + 6 + """ + return self._java_matrix_wrapper.call("numCols") + + def toLocalMatrix(self): + """ + Collect the distributed matrix on the driver as a DenseMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() + + >>> # This BlockMatrix will have 6 effective rows, due to + >>> # having two sub-matrix blocks stacked, each with 3 rows. + >>> # The ensuing DenseMatrix will also have 6 rows. + >>> print(mat.numRows) + 6 + + >>> # This BlockMatrix will have 2 effective columns, due to + >>> # having two sub-matrix blocks stacked, each with 2 + >>> # columns. The ensuing DenseMatrix will also have 2 columns. + >>> print(mat.numCols) + 2 + """ + return self._java_matrix_wrapper.call("toLocalMatrix") + + def toIndexedRowMatrix(self): + """ + Convert this matrix to an IndexedRowMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix() + + >>> # This BlockMatrix will have 6 effective rows, due to + >>> # having two sub-matrix blocks stacked, each with 3 rows. + >>> # The ensuing IndexedRowMatrix will also have 6 rows. + >>> print(mat.numRows()) + 6 + + >>> # This BlockMatrix will have 2 effective columns, due to + >>> # having two sub-matrix blocks stacked, each with 2 columns. + >>> # The ensuing IndexedRowMatrix will also have 2 columns. + >>> print(mat.numCols()) + 2 + """ + java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") + return IndexedRowMatrix(java_indexed_row_matrix) + + def toCoordinateMatrix(self): + """ + Convert this matrix to a CoordinateMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])), + ... ((1, 0), Matrices.dense(1, 2, [7, 8]))]) + >>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix() + >>> mat.entries.take(3) + [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)] + """ + java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") + return CoordinateMatrix(java_coordinate_matrix) + def _test(): import doctest from pyspark import SparkContext from pyspark.sql import SQLContext + from pyspark.mllib.linalg import Matrices import pyspark.mllib.linalg.distributed globs = pyspark.mllib.linalg.distributed.__dict__.copy() globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) globs['sqlContext'] = SQLContext(globs['sc']) + globs['Matrices'] = Matrices (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org