Block matrix stores the data as key->Matrix pairs and multiply does a reduceByKey operations, aggregating matrices per key. Since you said each block is residing in a separate partition, reduceByKey might be effectively shuffling all of the data. A better way to go about this is to allow multiple blocks within each partition so that reduceByKey does a local reduce before aggregating across nodes.
Rakesh On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Dear Spark developers, > > > > I am trying to perform BlockMatrix multiplication in Spark. My test is as > follows: 1)create a matrix of N blocks, so that each row of block matrix > contains only 1 block and each block resides in separate partition on > separate node, 2)transpose the block matrix and 3)multiply the transposed > matrix by the original non-transposed one. This should preserve the data > locality, so there should be no need for shuffle. However, I observe huge > shuffle with the block matrix size of 50000x10000 and one block > 10000x10000, 5 blocks per matrix. Could you suggest what is wrong? > > > > My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 > GB RAM. > > Below is the test code: > > > > import org.apache.spark.mllib.linalg.Matrices > > import org.apache.spark.mllib.linalg.distributed.BlockMatrix > > val parallelism = 5 > > val blockSize = 10000 > > val rows = parallelism * blockSize > > val columns = blockSize > > val size = rows * columns > > assert(rows % blockSize == 0) > > assert(columns % blockSize == 0) > > val rowBlocks = rows / blockSize > > val columnBlocks = columns / blockSize > > val rdd = sc.parallelize( { > > for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) > yield (i, j) > > }, parallelism).map( coord => (coord, > Matrices.rand(blockSize, blockSize, util.Random.self))) > > val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() > > bm.validate() > > val mb = bm.transpose.cache() > > mb.validate() > > val t = System.nanoTime() > > val ata = mb.multiply(bm) > > ata.validate() > > println(rows + "x" + columns + ", block:" + blockSize + "\t" + > (System.nanoTime() - t) / 1e9) > > > > > > Best regards, Alexander >