Hi Rakesh,

I am not interested in a particular case of A^T*A. This case is a handy setup 
so I don’t need to create another matrix and force the blocks to co-locate. 
Basically, I am trying to understand the effectiveness of BlockMatrix for 
multiplication of distributed matrices. It seems that I am missing something or 
using it wrong.

Best regards, Alexander

From: Rakesh Chalasani [mailto:vnit.rak...@gmail.com]
Sent: Tuesday, July 14, 2015 9:05 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: BlockMatrix multiplication

Hi Alexander:

Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361

BlockMatrix multiply uses a custom partitioner called GridPartitioner, that 
might be causing the shuffle; which, in your special case need not happen. But, 
from what I understood from your code, I don't think this is an issue since 
your special case can be handled using computeGramMatrix on RowMatrix. Is there 
a reason you did not use that?

Rakesh


On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi Rakesh,

Thanks for suggestion. Each block of original matrix is in separate partition. 
Each block of transposed matrix is also in a separate partition. The partition 
numbers are the same for the blocks that undergo multiplication. Each partition 
is on a separate worker. Basically, I want to force each worker to multiply 
only 2 blocks. This should be the optimal configuration for multiplication, as 
far as I understand. Having several blocks in each partition as you suggested 
is not optimal, is it?

Best regards, Alexander


Block matrix stores the data as key->Matrix pairs and multiply does a 
reduceByKey operations, aggregating matrices per key. Since you said each block 
is residing in a separate partition, reduceByKey might be effectively shuffling 
all of the data. A better way to go about this is to allow multiple blocks 
within each partition so that reduceByKey does a local reduce before 
aggregating across nodes.

Rakesh

On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Dear Spark developers,

I am trying to perform BlockMatrix multiplication in Spark. My test is as 
follows: 1)create a matrix of N blocks, so that each row of block matrix 
contains only 1 block and each block resides in separate partition on separate 
node, 2)transpose the block matrix and 3)multiply the transposed matrix by the 
original non-transposed one. This should preserve the data locality, so there 
should be no need for shuffle. However, I observe huge shuffle with the block 
matrix size of 50000x10000 and one block 10000x10000, 5 blocks per matrix. 
Could you suggest what is wrong?

My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB 
RAM.
Below is the test code:

import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val parallelism = 5
val blockSize = 10000
val rows = parallelism * blockSize
val columns = blockSize
val size = rows * columns
assert(rows % blockSize == 0)
assert(columns % blockSize == 0)
val rowBlocks = rows / blockSize
val columnBlocks = columns / blockSize
val rdd = sc.parallelize( {
                for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield 
(i, j)
                }, parallelism).map( coord => (coord, Matrices.rand(blockSize, 
blockSize, util.Random.self)))
val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
bm.validate()
val mb = bm.transpose.cache()
mb.validate()
val t = System.nanoTime()
val ata = mb.multiply(bm)
ata.validate()
println(rows + "x" + columns + ", block:" + blockSize + "\t" + 
(System.nanoTime() - t) / 1e9)


Best regards, Alexander

Reply via email to