Block matrix stores the data as key->Matrix pairs and multiply does a
reduceByKey operations, aggregating matrices per key. Since you said each
block is residing in a separate partition, reduceByKey might be effectively
shuffling all of the data. A better way to go about this is to allow
multiple blocks within each partition so that reduceByKey does a local
reduce before aggregating across nodes.

Rakesh

On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

>  Dear Spark developers,
>
>
>
> I am trying to perform BlockMatrix multiplication in Spark. My test is as
> follows: 1)create a matrix of N blocks, so that each row of block matrix
> contains only 1 block and each block resides in separate partition on
> separate node, 2)transpose the block matrix and 3)multiply the transposed
> matrix by the original non-transposed one. This should preserve the data
> locality, so there should be no need for shuffle. However, I observe huge
> shuffle with the block matrix size of 50000x10000 and one block
> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong?
>
>
>
> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16
> GB RAM.
>
> Below is the test code:
>
>
>
> import org.apache.spark.mllib.linalg.Matrices
>
> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
>
> val parallelism = 5
>
> val blockSize = 10000
>
> val rows = parallelism * blockSize
>
> val columns = blockSize
>
> val size = rows * columns
>
> assert(rows % blockSize == 0)
>
> assert(columns % blockSize == 0)
>
> val rowBlocks = rows / blockSize
>
> val columnBlocks = columns / blockSize
>
> val rdd = sc.parallelize( {
>
>                 for(i <- 0 until rowBlocks; j <- 0 until columnBlocks)
> yield (i, j)
>
>                 }, parallelism).map( coord => (coord,
> Matrices.rand(blockSize, blockSize, util.Random.self)))
>
> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
>
> bm.validate()
>
> val mb = bm.transpose.cache()
>
> mb.validate()
>
> val t = System.nanoTime()
>
> val ata = mb.multiply(bm)
>
> ata.validate()
>
> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
> (System.nanoTime() - t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>

Reply via email to