Repository: spark
Updated Branches:
  refs/heads/branch-1.5 350006497 -> eedb996dd


[SPARK-6486] [MLLIB] [PYTHON] Add BlockMatrix to PySpark.

mengxr This adds the `BlockMatrix` to PySpark.  I have the conversions to 
`IndexedRowMatrix` and `CoordinateMatrix` ready as well, so once PR #7554 is 
completed (which relies on PR #7746), this PR can be finished.

Author: Mike Dusenberry <mwdus...@us.ibm.com>

Closes #7761 from dusenberrymw/SPARK-6486_Add_BlockMatrix_to_PySpark and 
squashes the following commits:

27195c2 [Mike Dusenberry] Adding one more check to 
_convert_to_matrix_block_tuple, and a few minor documentation changes.
ae50883 [Mike Dusenberry] Minor update: BlockMatrix should inherit from 
DistributedMatrix.
b8acc1c [Mike Dusenberry] Moving BlockMatrix to 
pyspark.mllib.linalg.distributed, updating the logic to match that of the other 
distributed matrices, adding conversions, and adding documentation.
c014002 [Mike Dusenberry] Using properties for better documentation.
3bda6ab [Mike Dusenberry] Adding documentation.
8fb3095 [Mike Dusenberry] Small cleanup.
e17af2e [Mike Dusenberry] Adding BlockMatrix to PySpark.

(cherry picked from commit 34dcf10104460816382908b2b8eeb6c925e862bf)
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eedb996d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eedb996d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eedb996d

Branch: refs/heads/branch-1.5
Commit: eedb996dde5593a97bcb61b3b1515e6fdea6aa70
Parents: 3500064
Author: Mike Dusenberry <mwdus...@us.ibm.com>
Authored: Wed Aug 5 07:40:50 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Wed Aug 5 07:42:25 2015 -0700

----------------------------------------------------------------------
 docs/mllib-data-types.md                        |  41 +++
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  25 ++
 python/pyspark/mllib/linalg/distributed.py      | 328 ++++++++++++++++++-
 3 files changed, 388 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/docs/mllib-data-types.md
----------------------------------------------------------------------
diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md
index 11033bf..f0e8d54 100644
--- a/docs/mllib-data-types.md
+++ b/docs/mllib-data-types.md
@@ -494,6 +494,9 @@ rowMat = mat.toRowMatrix()
 
 # Convert to a CoordinateMatrix.
 coordinateMat = mat.toCoordinateMatrix()
+
+# Convert to a BlockMatrix.
+blockMat = mat.toBlockMatrix()
 {% endhighlight %}
 </div>
 
@@ -594,6 +597,9 @@ rowMat = mat.toRowMatrix()
 
 # Convert to an IndexedRowMatrix.
 indexedRowMat = mat.toIndexedRowMatrix()
+
+# Convert to a BlockMatrix.
+blockMat = mat.toBlockMatrix()
 {% endhighlight %}
 </div>
 
@@ -661,4 +667,39 @@ matA.validate();
 BlockMatrix ata = matA.transpose().multiply(matA);
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+
+A 
[`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix)
 
+can be created from an `RDD` of sub-matrix blocks, where a sub-matrix block is 
a 
+`((blockRowIndex, blockColIndex), sub-matrix)` tuple.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Matrices
+from pyspark.mllib.linalg.distributed import BlockMatrix
+
+# Create an RDD of sub-matrix blocks.
+blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), 
+                         ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 
12]))])
+
+# Create a BlockMatrix from an RDD of sub-matrix blocks.
+mat = BlockMatrix(blocks, 3, 2)
+
+# Get its size.
+m = mat.numRows() # 6
+n = mat.numCols() # 2
+
+# Get the blocks as an RDD of sub-matrix blocks.
+blocksRDD = mat.blocks
+
+# Convert to a LocalMatrix.
+localMat = mat.toLocalMatrix()
+
+# Convert to an IndexedRowMatrix.
+indexedRowMat = mat.toIndexedRowMatrix()
+
+# Convert to a CoordinateMatrix.
+coordinateMat = mat.toCoordinateMatrix()
+{% endhighlight %}
+</div>
 </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2b3fae..f585aac 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1129,6 +1129,21 @@ private[python] class PythonMLLibAPI extends 
Serializable {
   }
 
   /**
+   * Wrapper around BlockMatrix constructor.
+   */
+  def createBlockMatrix(blocks: DataFrame, rowsPerBlock: Int, colsPerBlock: 
Int,
+                        numRows: Long, numCols: Long): BlockMatrix = {
+    // We use DataFrames for serialization of sub-matrix blocks from
+    // Python, so map each Row in the DataFrame back to a
+    // ((blockRowIndex, blockColIndex), sub-matrix) tuple.
+    val blockTuples = blocks.map {
+      case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: 
Matrix) =>
+        ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix)
+    }
+    new BlockMatrix(blockTuples, rowsPerBlock, colsPerBlock, numRows, numCols)
+  }
+
+  /**
    * Return the rows of an IndexedRowMatrix.
    */
   def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
@@ -1147,6 +1162,16 @@ private[python] class PythonMLLibAPI extends 
Serializable {
     val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
     sqlContext.createDataFrame(coordinateMatrix.entries)
   }
+
+  /**
+   * Return the sub-matrix blocks of a BlockMatrix.
+   */
+  def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = {
+    // We use DataFrames for serialization of sub-matrix blocks to
+    // Python, so return a DataFrame.
+    val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext)
+    sqlContext.createDataFrame(blockMatrix.blocks)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/eedb996d/python/pyspark/mllib/linalg/distributed.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg/distributed.py 
b/python/pyspark/mllib/linalg/distributed.py
index 666d833..aec407d 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -28,11 +28,12 @@ from py4j.java_gateway import JavaObject
 
 from pyspark import RDD
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, Matrix
 
 
 __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
-           'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix']
+           'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
+           'BlockMatrix']
 
 
 class DistributedMatrix(object):
@@ -322,6 +323,35 @@ class IndexedRowMatrix(DistributedMatrix):
         java_coordinate_matrix = 
self._java_matrix_wrapper.call("toCoordinateMatrix")
         return CoordinateMatrix(java_coordinate_matrix)
 
+    def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+        """
+        Convert this matrix to a BlockMatrix.
+
+        :param rowsPerBlock: Number of rows that make up each block.
+                             The blocks forming the final rows are not
+                             required to have the given number of rows.
+        :param colsPerBlock: Number of columns that make up each block.
+                             The blocks forming the final columns are not
+                             required to have the given number of columns.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(6, [4, 5, 6])])
+        >>> mat = IndexedRowMatrix(rows).toBlockMatrix()
+
+        >>> # This IndexedRowMatrix will have 7 effective rows, due to
+        >>> # the highest row index being 6, and the ensuing
+        >>> # BlockMatrix will have 7 rows as well.
+        >>> print(mat.numRows())
+        7
+
+        >>> print(mat.numCols())
+        3
+        """
+        java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+                                                           rowsPerBlock,
+                                                           colsPerBlock)
+        return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
 
 class MatrixEntry(object):
     """
@@ -476,19 +506,18 @@ class CoordinateMatrix(DistributedMatrix):
 
         >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
         ...                           MatrixEntry(6, 4, 2.1)])
+        >>> mat = CoordinateMatrix(entries).toRowMatrix()
 
         >>> # This CoordinateMatrix will have 7 effective rows, due to
         >>> # the highest row index being 6, but the ensuing RowMatrix
         >>> # will only have 2 rows since there are only entries on 2
         >>> # unique rows.
-        >>> mat = CoordinateMatrix(entries).toRowMatrix()
         >>> print(mat.numRows())
         2
 
         >>> # This CoordinateMatrix will have 5 columns, due to the
         >>> # highest column index being 4, and the ensuing RowMatrix
         >>> # will have 5 columns as well.
-        >>> mat = CoordinateMatrix(entries).toRowMatrix()
         >>> print(mat.numCols())
         5
         """
@@ -501,33 +530,320 @@ class CoordinateMatrix(DistributedMatrix):
 
         >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
         ...                           MatrixEntry(6, 4, 2.1)])
+        >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
 
         >>> # This CoordinateMatrix will have 7 effective rows, due to
         >>> # the highest row index being 6, and the ensuing
         >>> # IndexedRowMatrix will have 7 rows as well.
-        >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
         >>> print(mat.numRows())
         7
 
         >>> # This CoordinateMatrix will have 5 columns, due to the
         >>> # highest column index being 4, and the ensuing
         >>> # IndexedRowMatrix will have 5 columns as well.
-        >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
         >>> print(mat.numCols())
         5
         """
         java_indexed_row_matrix = 
self._java_matrix_wrapper.call("toIndexedRowMatrix")
         return IndexedRowMatrix(java_indexed_row_matrix)
 
+    def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+        """
+        Convert this matrix to a BlockMatrix.
+
+        :param rowsPerBlock: Number of rows that make up each block.
+                             The blocks forming the final rows are not
+                             required to have the given number of rows.
+        :param colsPerBlock: Number of columns that make up each block.
+                             The blocks forming the final columns are not
+                             required to have the given number of columns.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(6, 4, 2.1)])
+        >>> mat = CoordinateMatrix(entries).toBlockMatrix()
+
+        >>> # This CoordinateMatrix will have 7 effective rows, due to
+        >>> # the highest row index being 6, and the ensuing
+        >>> # BlockMatrix will have 7 rows as well.
+        >>> print(mat.numRows())
+        7
+
+        >>> # This CoordinateMatrix will have 5 columns, due to the
+        >>> # highest column index being 4, and the ensuing
+        >>> # BlockMatrix will have 5 columns as well.
+        >>> print(mat.numCols())
+        5
+        """
+        java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+                                                           rowsPerBlock,
+                                                           colsPerBlock)
+        return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
+
+def _convert_to_matrix_block_tuple(block):
+    if (isinstance(block, tuple) and len(block) == 2
+            and isinstance(block[0], tuple) and len(block[0]) == 2
+            and isinstance(block[1], Matrix)):
+        blockRowIndex = int(block[0][0])
+        blockColIndex = int(block[0][1])
+        subMatrix = block[1]
+        return ((blockRowIndex, blockColIndex), subMatrix)
+    else:
+        raise TypeError("Cannot convert type %s into a sub-matrix block tuple" 
% type(block))
+
+
+class BlockMatrix(DistributedMatrix):
+    """
+    .. note:: Experimental
+
+    Represents a distributed matrix in blocks of local matrices.
+
+    :param blocks: An RDD of sub-matrix blocks
+                   ((blockRowIndex, blockColIndex), sub-matrix) that
+                   form this distributed matrix. If multiple blocks
+                   with the same index exist, the results for
+                   operations like add and multiply will be
+                   unpredictable.
+    :param rowsPerBlock: Number of rows that make up each block.
+                         The blocks forming the final rows are not
+                         required to have the given number of rows.
+    :param colsPerBlock: Number of columns that make up each block.
+                         The blocks forming the final columns are not
+                         required to have the given number of columns.
+    :param numRows: Number of rows of this matrix. If the supplied
+                    value is less than or equal to zero, the number
+                    of rows will be calculated when `numRows` is
+                    invoked.
+    :param numCols: Number of columns of this matrix. If the supplied
+                    value is less than or equal to zero, the number
+                    of columns will be calculated when `numCols` is
+                    invoked.
+    """
+    def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, 
numCols=0):
+        """
+        Note: This docstring is not shown publicly.
+
+        Create a wrapper over a Java BlockMatrix.
+
+        Publicly, we require that `blocks` be an RDD.  However, for
+        internal usage, `blocks` can also be a Java BlockMatrix
+        object, in which case we can wrap it directly.  This
+        assists in clean matrix conversions.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+
+        >>> mat_diff = BlockMatrix(blocks, 3, 2)
+        >>> (mat_diff._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        False
+
+        >>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2)
+        >>> (mat_same._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        True
+        """
+        if isinstance(blocks, RDD):
+            blocks = blocks.map(_convert_to_matrix_block_tuple)
+            # We use DataFrames for serialization of sub-matrix blocks
+            # from Python, so first convert the RDD to a DataFrame on
+            # this side. This will convert each sub-matrix block
+            # tuple to a Row containing the 'blockRowIndex',
+            # 'blockColIndex', and 'subMatrix' values, which can
+            # each be easily serialized.  We will convert back to
+            # ((blockRowIndex, blockColIndex), sub-matrix) tuples on
+            # the Scala side.
+            java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(),
+                                        int(rowsPerBlock), int(colsPerBlock),
+                                        long(numRows), long(numCols))
+        elif (isinstance(blocks, JavaObject)
+              and blocks.getClass().getSimpleName() == "BlockMatrix"):
+            java_matrix = blocks
+        else:
+            raise TypeError("blocks should be an RDD of sub-matrix blocks as "
+                            "((int, int), matrix) tuples, got %s" % 
type(blocks))
+
+        self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+    @property
+    def blocks(self):
+        """
+        The RDD of sub-matrix blocks
+        ((blockRowIndex, blockColIndex), sub-matrix) that form this
+        distributed matrix.
+
+        >>> mat = BlockMatrix(
+        ...     sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 
6])),
+        ...                     ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 
11, 12]))]), 3, 2)
+        >>> blocks = mat.blocks
+        >>> blocks.first()
+        ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
+
+        """
+        # We use DataFrames for serialization of sub-matrix blocks
+        # from Java, so we first convert the RDD of blocks to a
+        # DataFrame on the Scala/Java side. Then we map each Row in
+        # the DataFrame back to a sub-matrix block on this side.
+        blocks_df = callMLlibFunc("getMatrixBlocks", 
self._java_matrix_wrapper._java_model)
+        blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
+        return blocks
+
+    @property
+    def rowsPerBlock(self):
+        """
+        Number of rows that make up each block.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> mat.rowsPerBlock
+        3
+        """
+        return self._java_matrix_wrapper.call("rowsPerBlock")
+
+    @property
+    def colsPerBlock(self):
+        """
+        Number of columns that make up each block.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> mat.colsPerBlock
+        2
+        """
+        return self._java_matrix_wrapper.call("colsPerBlock")
+
+    @property
+    def numRowBlocks(self):
+        """
+        Number of rows of blocks in the BlockMatrix.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> mat.numRowBlocks
+        2
+        """
+        return self._java_matrix_wrapper.call("numRowBlocks")
+
+    @property
+    def numColBlocks(self):
+        """
+        Number of columns of blocks in the BlockMatrix.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> mat.numColBlocks
+        1
+        """
+        return self._java_matrix_wrapper.call("numColBlocks")
+
+    def numRows(self):
+        """
+        Get or compute the number of rows.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> print(mat.numRows())
+        6
+
+        >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+        >>> print(mat.numRows())
+        7
+        """
+        return self._java_matrix_wrapper.call("numRows")
+
+    def numCols(self):
+        """
+        Get or compute the number of cols.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+
+        >>> mat = BlockMatrix(blocks, 3, 2)
+        >>> print(mat.numCols())
+        2
+
+        >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+        >>> print(mat.numCols())
+        6
+        """
+        return self._java_matrix_wrapper.call("numCols")
+
+    def toLocalMatrix(self):
+        """
+        Collect the distributed matrix on the driver as a DenseMatrix.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
+
+        >>> # This BlockMatrix will have 6 effective rows, due to
+        >>> # having two sub-matrix blocks stacked, each with 3 rows.
+        >>> # The ensuing DenseMatrix will also have 6 rows.
+        >>> print(mat.numRows)
+        6
+
+        >>> # This BlockMatrix will have 2 effective columns, due to
+        >>> # having two sub-matrix blocks stacked, each with 2
+        >>> # columns. The ensuing DenseMatrix will also have 2 columns.
+        >>> print(mat.numCols)
+        2
+        """
+        return self._java_matrix_wrapper.call("toLocalMatrix")
+
+    def toIndexedRowMatrix(self):
+        """
+        Convert this matrix to an IndexedRowMatrix.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 
4, 5, 6])),
+        ...                          ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 
10, 11, 12]))])
+        >>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
+
+        >>> # This BlockMatrix will have 6 effective rows, due to
+        >>> # having two sub-matrix blocks stacked, each with 3 rows.
+        >>> # The ensuing IndexedRowMatrix will also have 6 rows.
+        >>> print(mat.numRows())
+        6
+
+        >>> # This BlockMatrix will have 2 effective columns, due to
+        >>> # having two sub-matrix blocks stacked, each with 2 columns.
+        >>> # The ensuing IndexedRowMatrix will also have 2 columns.
+        >>> print(mat.numCols())
+        2
+        """
+        java_indexed_row_matrix = 
self._java_matrix_wrapper.call("toIndexedRowMatrix")
+        return IndexedRowMatrix(java_indexed_row_matrix)
+
+    def toCoordinateMatrix(self):
+        """
+        Convert this matrix to a CoordinateMatrix.
+
+        >>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
+        ...                          ((1, 0), Matrices.dense(1, 2, [7, 8]))])
+        >>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
+        >>> mat.entries.take(3)
+        [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 
7.0)]
+        """
+        java_coordinate_matrix = 
self._java_matrix_wrapper.call("toCoordinateMatrix")
+        return CoordinateMatrix(java_coordinate_matrix)
+
 
 def _test():
     import doctest
     from pyspark import SparkContext
     from pyspark.sql import SQLContext
+    from pyspark.mllib.linalg import Matrices
     import pyspark.mllib.linalg.distributed
     globs = pyspark.mllib.linalg.distributed.__dict__.copy()
     globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
     globs['sqlContext'] = SQLContext(globs['sc'])
+    globs['Matrices'] = Matrices
     (failure_count, test_count) = doctest.testmod(globs=globs, 
optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
     if failure_count:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to