Hi Alexander,

Feel free to submit an "improvement" JIRA.

Best,
Burak

On Thu, Jul 16, 2015 at 4:20 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

>  Hi Burak,
>
>
>
> If I change the code as you suggested then it fails with (given that
> blockSize is 10000):
>
> “org.apache.spark.SparkException: The MatrixBlock at (3, 3) has dimensions
> different than rowsPerBlock: 20000, and colsPerBlock: 20000. Blocks on the
> right and bottom edges can have smaller dimensions. You may use the
> repartition method to fix this issue.”
>
>
>
> Should I submit a JIRA Issue related to the problem of block matrix
> shuffling given the blocks co-location?
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Burak Yavuz [mailto:brk...@gmail.com]
> *Sent:* Wednesday, July 15, 2015 3:29 PM
>
> *To:* Ulanov, Alexander
> *Cc:* Rakesh Chalasani; dev@spark.apache.org
> *Subject:* Re: BlockMatrix multiplication
>
>
>
> Hi Alexander,
>
>
>
> I just noticed the error in my logic. There will always be a shuffle due
> to the `cogroup`. `join` also uses cogroup, therefore a shuffle is
> inevitable. However, the reduceByKey will not cause a shuffle. I forgot
> about how cogroup will try to match things, even if they don't exist.
>
>
>
> An optimization we wanted to perform was that in a grid partitioned
> setting, send a partition a single copy of the block and match blocks
> within the partition. Right now we send a partition multiple copies,
> because we cogroup on (i, j, k).
>
>
>
> Unfortunately in the current setting, I don't think there is a way to
> reduce the shuffle. Could you observe what the shuffle is if you change:
>
> val bm = new BlockMatrix(rdd, 2 * blockSize, 2 * blockSize).cache()
>
>
>
> My hypothesis is that the shuffle should decrease then.
>
>
>
> Best,
>
> Burak
>
>
>
> On Wed, Jul 15, 2015 at 3:04 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
>  Hi Burak,
>
>
>
> I’ve modified my code as you suggested, however it still leads to
> shuffling. Could you suggest what’s wrong with my code or provide an
> example code with block matrices multiplication that preserves data
> locality and does not cause shuffling?
>
>
>
>
>
> Modified code:
>
> import org.apache.spark.mllib.linalg.Matrices
>
> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
>
> val parallelism = 5
>
> val blockSize = 10000
>
> val rows = parallelism * blockSize
>
> val columns = blockSize
>
> val size = rows * columns
>
> assert(rows % blockSize == 0)
>
> assert(columns % blockSize == 0)
>
> val rowBlocks = rows / blockSize
>
> val columnBlocks = columns / blockSize
>
> // make block-diagonal matrix
>
> val rdd = sc.parallelize( {
>
>                 for(i <- 0 until rowBlocks; j <- 0 until columnBlocks)
> yield (i, i)
>
>                 }, parallelism).map( coord => (coord,
> Matrices.rand(blockSize, blockSize, util.Random.self)))
>
> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
>
> bm.validate()
>
> val t = System.nanoTime()
>
> // multiply matrix with itself
>
> val aa = bm.multiply(bm)
>
> aa.validate()
>
> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
> (System.nanoTime() - t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Ulanov, Alexander
> *Sent:* Tuesday, July 14, 2015 6:24 PM
> *To:* 'Burak Yavuz'
> *Cc:* Rakesh Chalasani; dev@spark.apache.org
> *Subject:* RE: BlockMatrix multiplication
>
>
>
> Hi Burak,
>
>
>
> Thank you for explanation! I will try to make a diagonal block matrix and
> report you the results.
>
>
>
> Column- or row based partitioner make sense to me, because it is a direct
> analogy from column or row-based data storage in matrices, which is used in
> BLAS.
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Burak Yavuz [mailto:brk...@gmail.com <brk...@gmail.com>]
> *Sent:* Tuesday, July 14, 2015 10:14 AM
> *To:* Ulanov, Alexander
> *Cc:* Rakesh Chalasani; dev@spark.apache.org
> *Subject:* Re: BlockMatrix multiplication
>
>
>
> Hi Alexander,
>
>
>
> From your example code, using the GridPartitioner, you will have 1 column,
> and 5 rows. When you perform an A^T^A multiplication, you will generate a
> separate GridPartitioner with 5 columns and 5 rows. Therefore you are
> observing a huge shuffle. If you would generate a diagonal-block matrix as
> an example (5x5), you should not observe any shuffle.
>
>
>
> Basically, your example causes the worst kind of shuffle. We can implement
> RowBasedPartitioning, and ColumnBasedPartitioning for optimization, but we
> didn't initially see it necessary to expose the partitioners to users, and
> didn't add them (you can find the old implementations here
> <https://github.com/brkyvz/spark/commit/9ae85aa1ebabdc099d7f655bc1d9021d34d2910f>
> ).
>
>
>
> Hope that helps!
>
>
>
> Best,
>
> Burak
>
>
>
> On Tue, Jul 14, 2015 at 9:37 AM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
>  Hi Rakesh,
>
>
>
> I am not interested in a particular case of A^T*A. This case is a handy
> setup so I don’t need to create another matrix and force the blocks to
> co-locate. Basically, I am trying to understand the effectiveness of
> BlockMatrix for multiplication of distributed matrices. It seems that I am
> missing something or using it wrong.
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Rakesh Chalasani [mailto:vnit.rak...@gmail.com]
> *Sent:* Tuesday, July 14, 2015 9:05 AM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: BlockMatrix multiplication
>
>
>
> Hi Alexander:
>
>
>
> Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected.
> Check
>
>
> https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361
>
>
>
> BlockMatrix multiply uses a custom partitioner called GridPartitioner,
> that might be causing the shuffle; which, in your special case need not
> happen. But, from what I understood from your code, I don't think this is
> an issue since your special case can be handled using computeGramMatrix on
> RowMatrix. Is there a reason you did not use that?
>
>
>
> Rakesh
>
>
>
>
>
> On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
>  Hi Rakesh,
>
>
>
> Thanks for suggestion. Each block of original matrix is in separate
> partition. Each block of transposed matrix is also in a separate partition.
> The partition numbers are the same for the blocks that undergo
> multiplication. Each partition is on a separate worker. Basically, I want
> to force each worker to multiply only 2 blocks. This should be the optimal
> configuration for multiplication, as far as I understand. Having several
> blocks in each partition as you suggested is not optimal, is it?
>
>
>
> Best regards, Alexander
>
>
>
>  Block matrix stores the data as key->Matrix pairs and multiply does a
> reduceByKey operations, aggregating matrices per key. Since you said each
> block is residing in a separate partition, reduceByKey might be effectively
> shuffling all of the data. A better way to go about this is to allow
> multiple blocks within each partition so that reduceByKey does a local
> reduce before aggregating across nodes.
>
>
>
> Rakesh
>
>
>
> On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com>
> wrote:
>
>  Dear Spark developers,
>
>
>
> I am trying to perform BlockMatrix multiplication in Spark. My test is as
> follows: 1)create a matrix of N blocks, so that each row of block matrix
> contains only 1 block and each block resides in separate partition on
> separate node, 2)transpose the block matrix and 3)multiply the transposed
> matrix by the original non-transposed one. This should preserve the data
> locality, so there should be no need for shuffle. However, I observe huge
> shuffle with the block matrix size of 50000x10000 and one block
> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong?
>
>
>
> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16
> GB RAM.
>
> Below is the test code:
>
>
>
> import org.apache.spark.mllib.linalg.Matrices
>
> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
>
> val parallelism = 5
>
> val blockSize = 10000
>
> val rows = parallelism * blockSize
>
> val columns = blockSize
>
> val size = rows * columns
>
> assert(rows % blockSize == 0)
>
> assert(columns % blockSize == 0)
>
> val rowBlocks = rows / blockSize
>
> val columnBlocks = columns / blockSize
>
> val rdd = sc.parallelize( {
>
>                 for(i <- 0 until rowBlocks; j <- 0 until columnBlocks)
> yield (i, j)
>
>                 }, parallelism).map( coord => (coord,
> Matrices.rand(blockSize, blockSize, util.Random.self)))
>
> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
>
> bm.validate()
>
> val mb = bm.transpose.cache()
>
> mb.validate()
>
> val t = System.nanoTime()
>
> val ata = mb.multiply(bm)
>
> ata.validate()
>
> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
> (System.nanoTime() - t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>
>
>
>
>

Reply via email to