Hi Burak, Thank you for explanation! I will try to make a diagonal block matrix and report you the results.
Column- or row based partitioner make sense to me, because it is a direct analogy from column or row-based data storage in matrices, which is used in BLAS. Best regards, Alexander From: Burak Yavuz [mailto:brk...@gmail.com] Sent: Tuesday, July 14, 2015 10:14 AM To: Ulanov, Alexander Cc: Rakesh Chalasani; dev@spark.apache.org Subject: Re: BlockMatrix multiplication Hi Alexander, From your example code, using the GridPartitioner, you will have 1 column, and 5 rows. When you perform an A^T^A multiplication, you will generate a separate GridPartitioner with 5 columns and 5 rows. Therefore you are observing a huge shuffle. If you would generate a diagonal-block matrix as an example (5x5), you should not observe any shuffle. Basically, your example causes the worst kind of shuffle. We can implement RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we didn't initially see it necessary to expose the partitioners to users, and didn't add them (you can find the old implementations here<https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f>). Hope that helps! Best, Burak On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Hi Rakesh, I am not interested in a particular case of A^T*A. This case is a handy setup so I don’t need to create another matrix and force the blocks to co-locate. Basically, I am trying to understand the effectiveness of BlockMatrix for multiplication of distributed matrices. It seems that I am missing something or using it wrong. Best regards, Alexander From: Rakesh Chalasani [mailto:vnit.rak...@gmail.com<mailto:vnit.rak...@gmail.com>] Sent: Tuesday, July 14, 2015 9:05 AM To: Ulanov, Alexander Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: Re: BlockMatrix multiplication Hi Alexander: Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361 BlockMatrix multiply uses a custom partitioner called GridPartitioner, that might be causing the shuffle; which, in your special case need not happen. But, from what I understood from your code, I don't think this is an issue since your special case can be handled using computeGramMatrix on RowMatrix. Is there a reason you did not use that? Rakesh On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Hi Rakesh, Thanks for suggestion. Each block of original matrix is in separate partition. Each block of transposed matrix is also in a separate partition. The partition numbers are the same for the blocks that undergo multiplication. Each partition is on a separate worker. Basically, I want to force each worker to multiply only 2 blocks. This should be the optimal configuration for multiplication, as far as I understand. Having several blocks in each partition as you suggested is not optimal, is it? Best regards, Alexander Block matrix stores the data as key->Matrix pairs and multiply does a reduceByKey operations, aggregating matrices per key. Since you said each block is residing in a separate partition, reduceByKey might be effectively shuffling all of the data. A better way to go about this is to allow multiple blocks within each partition so that reduceByKey does a local reduce before aggregating across nodes. Rakesh On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Dear Spark developers, I am trying to perform BlockMatrix multiplication in Spark. My test is as follows: 1)create a matrix of N blocks, so that each row of block matrix contains only 1 block and each block resides in separate partition on separate node, 2)transpose the block matrix and 3)multiply the transposed matrix by the original non-transposed one. This should preserve the data locality, so there should be no need for shuffle. However, I observe huge shuffle with the block matrix size of 50000x10000 and one block 10000x10000, 5 blocks per matrix. Could you suggest what is wrong? My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB RAM. Below is the test code: import org.apache.spark.mllib.linalg.Matrices import org.apache.spark.mllib.linalg.distributed.BlockMatrix val parallelism = 5 val blockSize = 10000 val rows = parallelism * blockSize val columns = blockSize val size = rows * columns assert(rows % blockSize == 0) assert(columns % blockSize == 0) val rowBlocks = rows / blockSize val columnBlocks = columns / blockSize val rdd = sc.parallelize( { for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield (i, j) }, parallelism).map( coord => (coord, Matrices.rand(blockSize, blockSize, util.Random.self))) val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() bm.validate() val mb = bm.transpose.cache() mb.validate() val t = System.nanoTime() val ata = mb.multiply(bm) ata.validate() println(rows + "x" + columns + ", block:" + blockSize + "\t" + (System.nanoTime() - t) / 1e9) Best regards, Alexander