Hi Alexander:

Aw, I missed the 'cogroup' on BlockMatrix multiply! I stand corrected. Check
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala#L361

BlockMatrix multiply uses a custom partitioner called GridPartitioner, that
might be causing the shuffle; which, in your special case need not happen.
But, from what I understood from your code, I don't think this is an issue
since your special case can be handled using computeGramMatrix on
RowMatrix. Is there a reason you did not use that?

Rakesh


On Tue, Jul 14, 2015 at 11:03 AM Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

>  Hi Rakesh,
>
>  Thanks for suggestion. Each block of original matrix is in separate
> partition. Each block of transposed matrix is also in a separate partition.
> The partition numbers are the same for the blocks that undergo
> multiplication. Each partition is on a separate worker. Basically, I want
> to force each worker to multiply only 2 blocks. This should be the optimal
> configuration for multiplication, as far as I understand. Having several
> blocks in each partition as you suggested is not optimal, is it?
>
>  Best regards, Alexander
>
>
>
>   Block matrix stores the data as key->Matrix pairs and multiply does a
> reduceByKey operations, aggregating matrices per key. Since you said each
> block is residing in a separate partition, reduceByKey might be effectively
> shuffling all of the data. A better way to go about this is to allow
> multiple blocks within each partition so that reduceByKey does a local
> reduce before aggregating across nodes.
>
>  Rakesh
>
>  On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
>>  Dear Spark developers,
>>
>>
>>
>> I am trying to perform BlockMatrix multiplication in Spark. My test is as
>> follows: 1)create a matrix of N blocks, so that each row of block matrix
>> contains only 1 block and each block resides in separate partition on
>> separate node, 2)transpose the block matrix and 3)multiply the transposed
>> matrix by the original non-transposed one. This should preserve the data
>> locality, so there should be no need for shuffle. However, I observe huge
>> shuffle with the block matrix size of 50000x10000 and one block
>> 10000x10000, 5 blocks per matrix. Could you suggest what is wrong?
>>
>>
>>
>> My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16
>> GB RAM.
>>
>> Below is the test code:
>>
>>
>>
>> import org.apache.spark.mllib.linalg.Matrices
>>
>> import org.apache.spark.mllib.linalg.distributed.BlockMatrix
>>
>> val parallelism = 5
>>
>> val blockSize = 10000
>>
>> val rows = parallelism * blockSize
>>
>> val columns = blockSize
>>
>> val size = rows * columns
>>
>> assert(rows % blockSize == 0)
>>
>> assert(columns % blockSize == 0)
>>
>> val rowBlocks = rows / blockSize
>>
>> val columnBlocks = columns / blockSize
>>
>> val rdd = sc.parallelize( {
>>
>>                 for(i <- 0 until rowBlocks; j <- 0 until columnBlocks)
>> yield (i, j)
>>
>>                 }, parallelism).map( coord => (coord,
>> Matrices.rand(blockSize, blockSize, util.Random.self)))
>>
>> val bm = new BlockMatrix(rdd, blockSize, blockSize).cache()
>>
>> bm.validate()
>>
>> val mb = bm.transpose.cache()
>>
>> mb.validate()
>>
>> val t = System.nanoTime()
>>
>> val ata = mb.multiply(bm)
>>
>> ata.validate()
>>
>> println(rows + "x" + columns + ", block:" + blockSize + "\t" +
>> (System.nanoTime() - t) / 1e9)
>>
>>
>>
>>
>>
>> Best regards, Alexander
>>
>

Reply via email to