Hi Burak,
I’ve modified my code as you suggested, however it still leads to shuffling.
Could you suggest what’s wrong with my code or provide an example code with
block matrices multiplication that preserves data locality and does not cause
shuffling?
Modified code:
import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val parallelism = 5
val blockSize = 10000
val rows = parallelism * blockSize
val columns = blockSize
val size = rows * columns
assert(rows % blockSize == 0)
assert(columns % blockSize == 0)
val rowBlocks = rows / blockSize
val columnBlocks = columns / blockSize
// make block-diagonal matrix
val rdd = sc.parallelize( {
for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield
(i, i)
}, parallelism).map( coord => (coord, Matrices.rand(blockSize,
blockSize, util.Random.self)))
val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
bm.validate()
val t = System.nanoTime()
// multiply matrix with itself
val aa = bm.multiply(bm)
aa.validate()
println(rows + "x" + columns + ", block:" + blockSize + "\t" +
(System.nanoTime() - t) / 1e9)
Best regards, Alexander
From: Ulanov, Alexander
Sent: Tuesday, July 14, 2015 6:24 PM
To: 'Burak Yavuz'
Cc: Rakesh Chalasani; [email protected]
Subject: RE: BlockMatrix multiplication
Hi Burak,
Thank you for explanation! I will try to make a diagonal block matrix and
report you the results.
Column- or row based partitioner make sense to me, because it is a direct
analogy from column or row-based data storage in matrices, which is used in
BLAS.
Best regards, Alexander
From: Burak Yavuz [mailto:[email protected]]
Sent: Tuesday, July 14, 2015 10:14 AM
To: Ulanov, Alexander
Cc: Rakesh Chalasani; [email protected]<mailto:[email protected]>
Subject: Re: BlockMatrix multiplication
Hi Alexander,
From your example code, using the GridPartitioner, you will have 1 column, and
5 rows. When you perform an A^T^A multiplication, you will generate a separate
GridPartitioner with 5 columns and 5 rows. Therefore you are observing a huge
shuffle. If you would generate a diagonal-block matrix as an example (5x5), you
should not observe any shuffle.
Basically, your example causes the worst kind of shuffle. We can implement
RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we
didn't initially see it necessary to expose the partitioners to users, and
didn't add them (you can find the old implementations
here<https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f>).
Hope that helps!
Best,
Burak
On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Hi Rakesh,
I am not interested in a particular case of A^T*A. This case is a handy setup
so I don’t need to create another matrix and force the blocks to co-locate.
Basically, I am trying to understand the effectiveness of BlockMatrix for
multiplication of distributed matrices. It seems that I am missing something or
using it wrong.
Best regards, Alexander
From: Rakesh Chalasani
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, July 14, 2015 9:05 AM
To: Ulanov, Alexander
Cc: [email protected]<mailto:[email protected]>
Subject: Re: BlockMatrix multiplication
Hi Alexander:
Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361
BlockMatrix multiply uses a custom partitioner called GridPartitioner, that
might be causing the shuffle; which, in your special case need not happen. But,
from what I understood from your code, I don't think this is an issue since
your special case can be handled using computeGramMatrix on RowMatrix. Is there
a reason you did not use that?
Rakesh
On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Hi Rakesh,
Thanks for suggestion. Each block of original matrix is in separate partition.
Each block of transposed matrix is also in a separate partition. The partition
numbers are the same for the blocks that undergo multiplication. Each partition
is on a separate worker. Basically, I want to force each worker to multiply
only 2 blocks. This should be the optimal configuration for multiplication, as
far as I understand. Having several blocks in each partition as you suggested
is not optimal, is it?
Best regards, Alexander
Block matrix stores the data as key->Matrix pairs and multiply does a
reduceByKey operations, aggregating matrices per key. Since you said each block
is residing in a separate partition, reduceByKey might be effectively shuffling
all of the data. A better way to go about this is to allow multiple blocks
within each partition so that reduceByKey does a local reduce before
aggregating across nodes.
Rakesh
On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Dear Spark developers,
I am trying to perform BlockMatrix multiplication in Spark. My test is as
follows: 1)create a matrix of N blocks, so that each row of block matrix
contains only 1 block and each block resides in separate partition on separate
node, 2)transpose the block matrix and 3)multiply the transposed matrix by the
original non-transposed one. This should preserve the data locality, so there
should be no need for shuffle. However, I observe huge shuffle with the block
matrix size of 50000x10000 and one block 10000x10000, 5 blocks per matrix.
Could you suggest what is wrong?
My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB
RAM.
Below is the test code:
import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val parallelism = 5
val blockSize = 10000
val rows = parallelism * blockSize
val columns = blockSize
val size = rows * columns
assert(rows % blockSize == 0)
assert(columns % blockSize == 0)
val rowBlocks = rows / blockSize
val columnBlocks = columns / blockSize
val rdd = sc.parallelize( {
for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield
(i, j)
}, parallelism).map( coord => (coord, Matrices.rand(blockSize,
blockSize, util.Random.self)))
val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
bm.validate()
val mb = bm.transpose.cache()
mb.validate()
val t = System.nanoTime()
val ata = mb.multiply(bm)
ata.validate()
println(rows + "x" + columns + ", block:" + blockSize + "\t" +
(System.nanoTime() - t) / 1e9)
Best regards, Alexander