Block matrix stores the data as key->Matrix pairs and multiply does a
reduceByKey operations, aggregating matrices per key. Since you said each
block is residing in a separate partition, reduceByKey might be effectively
shuffling all of the data. A better way to go about this is to allow
multiple blocks within each partition so that reduceByKey does a local
reduce before aggregating across nodes.


On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <>

>  Dear Spark developers,
> I am trying to perform BlockMatrix multiplication in Spark. My test is as
> follows: 1)create a matrix of N blocks, so that each row of block matrix
> contains only 1 block and each block resides in separate partition on
> separate node, 2)transpose the block matrix and 3)multiply the transposed
> matrix by the original non-transposed one. This should preserve the data
> locality, so there should be no need for shuffle. However, I observe huge
> shuffle with the block matrix size of 50000x10000 and one block
> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong?
> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16
> Below is the test code:
> import org.apache.spark.mllib.linalg.Matrices
> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
> val parallelism = 5
> val blockSize = 10000
> val rows = parallelism * blockSize
> val columns = blockSize
> val size = rows * columns
> assert(rows % blockSize == 0)
> assert(columns % blockSize == 0)
> val rowBlocks = rows / blockSize
> val columnBlocks = columns / blockSize
> val rdd = sc.parallelize( {
>                 for(i <- 0 until rowBlocks; j <- 0 until columnBlocks)
> yield (i, j)
>                 }, parallelism).map( coord => (coord,
> Matrices.rand(blockSize, blockSize, util.Random.self)))
> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
> bm.validate()
> val mb = bm.transpose.cache()
> mb.validate()
> val t = System.nanoTime()
> val ata = mb.multiply(bm)
> ata.validate()
> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
> (System.nanoTime() - t) / 1e9)
> Best regards, Alexander

Reply via email to