Hi Alexander: Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361
BlockMatrix multiply uses a custom partitioner called GridPartitioner, that might be causing the shuffle; which, in your special case need not happen. But, from what I understood from your code, I don't think this is an issue since your special case can be handled using computeGramMatrix on RowMatrix. Is there a reason you did not use that? Rakesh On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Hi Rakesh, > > Thanks for suggestion. Each block of original matrix is in separate > partition. Each block of transposed matrix is also in a separate partition. > The partition numbers are the same for the blocks that undergo > multiplication. Each partition is on a separate worker. Basically, I want > to force each worker to multiply only 2 blocks. This should be the optimal > configuration for multiplication, as far as I understand. Having several > blocks in each partition as you suggested is not optimal, is it? > > Best regards, Alexander > > > > Block matrix stores the data as key->Matrix pairs and multiply does a > reduceByKey operations, aggregating matrices per key. Since you said each > block is residing in a separate partition, reduceByKey might be effectively > shuffling all of the data. A better way to go about this is to allow > multiple blocks within each partition so that reduceByKey does a local > reduce before aggregating across nodes. > > Rakesh > > On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > >> Dear Spark developers, >> >> >> >> I am trying to perform BlockMatrix multiplication in Spark. My test is as >> follows: 1)create a matrix of N blocks, so that each row of block matrix >> contains only 1 block and each block resides in separate partition on >> separate node, 2)transpose the block matrix and 3)multiply the transposed >> matrix by the original non-transposed one. This should preserve the data >> locality, so there should be no need for shuffle. However, I observe huge >> shuffle with the block matrix size of 50000x10000 and one block >> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong? >> >> >> >> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 >> GB RAM. >> >> Below is the test code: >> >> >> >> import org.apache.spark.mllib.linalg.Matrices >> >> import org.apache.spark.mllib.linalg.distributed.BlockMatrix >> >> val parallelism = 5 >> >> val blockSize = 10000 >> >> val rows = parallelism * blockSize >> >> val columns = blockSize >> >> val size = rows * columns >> >> assert(rows % blockSize == 0) >> >> assert(columns % blockSize == 0) >> >> val rowBlocks = rows / blockSize >> >> val columnBlocks = columns / blockSize >> >> val rdd = sc.parallelize( { >> >> for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) >> yield (i, j) >> >> }, parallelism).map( coord => (coord, >> Matrices.rand(blockSize, blockSize, util.Random.self))) >> >> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() >> >> bm.validate() >> >> val mb = bm.transpose.cache() >> >> mb.validate() >> >> val t = System.nanoTime() >> >> val ata = mb.multiply(bm) >> >> ata.validate() >> >> println(rows + "x" + columns + ", block:" + blockSize + "\t" + >> (System.nanoTime() - t) / 1e9) >> >> >> >> >> >> Best regards, Alexander >> >