Hi Burak,

Thank you for explanation! I will try to make a diagonal block matrix and 
report you the results.

Column- or row based partitioner make sense to me, because it is a direct 
analogy from column or row-based data storage in matrices, which is used in 
BLAS.

Best regards, Alexander

From: Burak Yavuz [mailto:brk...@gmail.com]
Sent: Tuesday, July 14, 2015 10:14 AM
To: Ulanov, Alexander
Cc: Rakesh Chalasani; dev@spark.apache.org
Subject: Re: BlockMatrix multiplication

Hi Alexander,

From your example code, using the GridPartitioner, you will have 1 column, and 
5 rows. When you perform an A^T^A multiplication, you will generate a separate 
GridPartitioner with 5 columns and 5 rows. Therefore you are observing a huge 
shuffle. If you would generate a diagonal-block matrix as an example (5x5), you 
should not observe any shuffle.

Basically, your example causes the worst kind of shuffle. We can implement 
RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we 
didn't initially see it necessary to expose the partitioners to users, and 
didn't add them (you can find the old implementations 
here<https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f>).

Hope that helps!

Best,
Burak

On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi Rakesh,

I am not interested in a particular case of A^T*A. This case is a handy setup 
so I don’t need to create another matrix and force the blocks to co-locate. 
Basically, I am trying to understand the effectiveness of BlockMatrix for 
multiplication of distributed matrices. It seems that I am missing something or 
using it wrong.

Best regards, Alexander

From: Rakesh Chalasani 
[mailto:vnit.rak...@gmail.com<mailto:vnit.rak...@gmail.com>]
Sent: Tuesday, July 14, 2015 9:05 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: BlockMatrix multiplication

Hi Alexander:

Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361

BlockMatrix multiply uses a custom partitioner called GridPartitioner, that 
might be causing the shuffle; which, in your special case need not happen. But, 
from what I understood from your code, I don't think this is an issue since 
your special case can be handled using computeGramMatrix on RowMatrix. Is there 
a reason you did not use that?

Rakesh


On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi Rakesh,

Thanks for suggestion. Each block of original matrix is in separate partition. 
Each block of transposed matrix is also in a separate partition. The partition 
numbers are the same for the blocks that undergo multiplication. Each partition 
is on a separate worker. Basically, I want to force each worker to multiply 
only 2 blocks. This should be the optimal configuration for multiplication, as 
far as I understand. Having several blocks in each partition as you suggested 
is not optimal, is it?

Best regards, Alexander

Block matrix stores the data as key->Matrix pairs and multiply does a 
reduceByKey operations, aggregating matrices per key. Since you said each block 
is residing in a separate partition, reduceByKey might be effectively shuffling 
all of the data. A better way to go about this is to allow multiple blocks 
within each partition so that reduceByKey does a local reduce before 
aggregating across nodes.

Rakesh

On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Dear Spark developers,

I am trying to perform BlockMatrix multiplication in Spark. My test is as 
follows: 1)create a matrix of N blocks, so that each row of block matrix 
contains only 1 block and each block resides in separate partition on separate 
node, 2)transpose the block matrix and 3)multiply the transposed matrix by the 
original non-transposed one. This should preserve the data locality, so there 
should be no need for shuffle. However, I observe huge shuffle with the block 
matrix size of 50000x10000 and one block 10000x10000, 5 blocks per matrix. 
Could you suggest what is wrong?

My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB 
RAM.
Below is the test code:

import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val parallelism = 5
val blockSize = 10000
val rows = parallelism * blockSize
val columns = blockSize
val size = rows * columns
assert(rows % blockSize == 0)
assert(columns % blockSize == 0)
val rowBlocks = rows / blockSize
val columnBlocks = columns / blockSize
val rdd = sc.parallelize( {
                for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield 
(i, j)
                }, parallelism).map( coord => (coord, Matrices.rand(blockSize, 
blockSize, util.Random.self)))
val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
bm.validate()
val mb = bm.transpose.cache()
mb.validate()
val t = System.nanoTime()
val ata = mb.multiply(bm)
ata.validate()
println(rows + "x" + columns + ", block:" + blockSize + "\t" + 
(System.nanoTime() - t) / 1e9)


Best regards, Alexander

Reply via email to