Hi Alexander, Feel free to submit an "improvement" JIRA.
Best, Burak On Thu, Jul 16, 2015 at 4:20 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Hi Burak, > > > > If I change the code as you suggested then it fails with (given that > blockSize is 10000): > > “org.apache.spark.SparkException: The MatrixBlock at (3, 3) has dimensions > different than rowsPerBlock: 20000, and colsPerBlock: 20000. Blocks on the > right and bottom edges can have smaller dimensions. You may use the > repartition method to fix this issue.” > > > > Should I submit a JIRA Issue related to the problem of block matrix > shuffling given the blocks co-location? > > > > Best regards, Alexander > > > > *From:* Burak Yavuz [mailto:brk...@gmail.com] > *Sent:* Wednesday, July 15, 2015 3:29 PM > > *To:* Ulanov, Alexander > *Cc:* Rakesh Chalasani; dev@spark.apache.org > *Subject:* Re: BlockMatrix multiplication > > > > Hi Alexander, > > > > I just noticed the error in my logic. There will always be a shuffle due > to the `cogroup`. `join` also uses cogroup, therefore a shuffle is > inevitable. However, the reduceByKey will not cause a shuffle. I forgot > about how cogroup will try to match things, even if they don't exist. > > > > An optimization we wanted to perform was that in a grid partitioned > setting, send a partition a single copy of the block and match blocks > within the partition. Right now we send a partition multiple copies, > because we cogroup on (i, j, k). > > > > Unfortunately in the current setting, I don't think there is a way to > reduce the shuffle. Could you observe what the shuffle is if you change: > > val bm = new BlockMatrix(rdd, 2 * blockSize, 2 * blockSize).cache() > > > > My hypothesis is that the shuffle should decrease then. > > > > Best, > > Burak > > > > On Wed, Jul 15, 2015 at 3:04 PM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > Hi Burak, > > > > I’ve modified my code as you suggested, however it still leads to > shuffling. Could you suggest what’s wrong with my code or provide an > example code with block matrices multiplication that preserves data > locality and does not cause shuffling? > > > > > > Modified code: > > import org.apache.spark.mllib.linalg.Matrices > > import org.apache.spark.mllib.linalg.distributed.BlockMatrix > > val parallelism = 5 > > val blockSize = 10000 > > val rows = parallelism * blockSize > > val columns = blockSize > > val size = rows * columns > > assert(rows % blockSize == 0) > > assert(columns % blockSize == 0) > > val rowBlocks = rows / blockSize > > val columnBlocks = columns / blockSize > > // make block-diagonal matrix > > val rdd = sc.parallelize( { > > for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) > yield (i, i) > > }, parallelism).map( coord => (coord, > Matrices.rand(blockSize, blockSize, util.Random.self))) > > val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() > > bm.validate() > > val t = System.nanoTime() > > // multiply matrix with itself > > val aa = bm.multiply(bm) > > aa.validate() > > println(rows + "x" + columns + ", block:" + blockSize + "\t" + > (System.nanoTime() - t) / 1e9) > > > > > > Best regards, Alexander > > > > *From:* Ulanov, Alexander > *Sent:* Tuesday, July 14, 2015 6:24 PM > *To:* 'Burak Yavuz' > *Cc:* Rakesh Chalasani; dev@spark.apache.org > *Subject:* RE: BlockMatrix multiplication > > > > Hi Burak, > > > > Thank you for explanation! I will try to make a diagonal block matrix and > report you the results. > > > > Column- or row based partitioner make sense to me, because it is a direct > analogy from column or row-based data storage in matrices, which is used in > BLAS. > > > > Best regards, Alexander > > > > *From:* Burak Yavuz [mailto:brk...@gmail.com <brk...@gmail.com>] > *Sent:* Tuesday, July 14, 2015 10:14 AM > *To:* Ulanov, Alexander > *Cc:* Rakesh Chalasani; dev@spark.apache.org > *Subject:* Re: BlockMatrix multiplication > > > > Hi Alexander, > > > > From your example code, using the GridPartitioner, you will have 1 column, > and 5 rows. When you perform an A^T^A multiplication, you will generate a > separate GridPartitioner with 5 columns and 5 rows. Therefore you are > observing a huge shuffle. If you would generate a diagonal-block matrix as > an example (5x5), you should not observe any shuffle. > > > > Basically, your example causes the worst kind of shuffle. We can implement > RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we > didn't initially see it necessary to expose the partitioners to users, and > didn't add them (you can find the old implementations here > <https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f> > ). > > > > Hope that helps! > > > > Best, > > Burak > > > > On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > Hi Rakesh, > > > > I am not interested in a particular case of A^T*A. This case is a handy > setup so I don’t need to create another matrix and force the blocks to > co-locate. Basically, I am trying to understand the effectiveness of > BlockMatrix for multiplication of distributed matrices. It seems that I am > missing something or using it wrong. > > > > Best regards, Alexander > > > > *From:* Rakesh Chalasani [mailto:vnit.rak...@gmail.com] > *Sent:* Tuesday, July 14, 2015 9:05 AM > *To:* Ulanov, Alexander > *Cc:* dev@spark.apache.org > *Subject:* Re: BlockMatrix multiplication > > > > Hi Alexander: > > > > Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. > Check > > > https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361 > > > > BlockMatrix multiply uses a custom partitioner called GridPartitioner, > that might be causing the shuffle; which, in your special case need not > happen. But, from what I understood from your code, I don't think this is > an issue since your special case can be handled using computeGramMatrix on > RowMatrix. Is there a reason you did not use that? > > > > Rakesh > > > > > > On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > > Hi Rakesh, > > > > Thanks for suggestion. Each block of original matrix is in separate > partition. Each block of transposed matrix is also in a separate partition. > The partition numbers are the same for the blocks that undergo > multiplication. Each partition is on a separate worker. Basically, I want > to force each worker to multiply only 2 blocks. This should be the optimal > configuration for multiplication, as far as I understand. Having several > blocks in each partition as you suggested is not optimal, is it? > > > > Best regards, Alexander > > > > Block matrix stores the data as key->Matrix pairs and multiply does a > reduceByKey operations, aggregating matrices per key. Since you said each > block is residing in a separate partition, reduceByKey might be effectively > shuffling all of the data. A better way to go about this is to allow > multiple blocks within each partition so that reduceByKey does a local > reduce before aggregating across nodes. > > > > Rakesh > > > > On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com> > wrote: > > Dear Spark developers, > > > > I am trying to perform BlockMatrix multiplication in Spark. My test is as > follows: 1)create a matrix of N blocks, so that each row of block matrix > contains only 1 block and each block resides in separate partition on > separate node, 2)transpose the block matrix and 3)multiply the transposed > matrix by the original non-transposed one. This should preserve the data > locality, so there should be no need for shuffle. However, I observe huge > shuffle with the block matrix size of 50000x10000 and one block > 10000x10000, 5 blocks per matrix. Could you suggest what is wrong? > > > > My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 > GB RAM. > > Below is the test code: > > > > import org.apache.spark.mllib.linalg.Matrices > > import org.apache.spark.mllib.linalg.distributed.BlockMatrix > > val parallelism = 5 > > val blockSize = 10000 > > val rows = parallelism * blockSize > > val columns = blockSize > > val size = rows * columns > > assert(rows % blockSize == 0) > > assert(columns % blockSize == 0) > > val rowBlocks = rows / blockSize > > val columnBlocks = columns / blockSize > > val rdd = sc.parallelize( { > > for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) > yield (i, j) > > }, parallelism).map( coord => (coord, > Matrices.rand(blockSize, blockSize, util.Random.self))) > > val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() > > bm.validate() > > val mb = bm.transpose.cache() > > mb.validate() > > val t = System.nanoTime() > > val ata = mb.multiply(bm) > > ata.validate() > > println(rows + "x" + columns + ", block:" + blockSize + "\t" + > (System.nanoTime() - t) / 1e9) > > > > > > Best regards, Alexander > > > > >