Jeff is right that it really depends on the application.

The area that I have done the most with map-reduce multiplication is in
coocurrence counting on large data sets.

If we talk about documents that contain terms (as opposed to users who view
videos, ornithologists who observe bird species or any other equivalent
dyadic system) then we can represent our observations as a document x term
matrix A.  For lots of analysis, it is interesting to compute a term x term
coocurrence matrix A' * A.  Where we have millions of documents and millions
of terms, this is daunting on a single machine.

In even more general terms, we might have a parallel corpus of documents
where there are two versions of each document in different languages.  If we
say that the document x term matrices for the two languages are A and B,
then the cross-language coocurrence matrix A' * B gives us rough insight
into which terms translate commonly into terms in the other language.  Chris
Dyer at Maryland talked a while ago about doing this using Hadoop and Miles
Osborne at Edinburgh is currently doing this as well.

For these kinds of problems, it is common to store the matrices A and B as
column compressed sparse representations.  This lets us quickly find all of
find all of the documents a term appears in, but makes it expensive to find
the terms in a single document.  For this particular form of multiplication
where the left multiplicand is transposed, using this uniform representation
turns out pretty nice because iterating down a row of A' is the same as
iterating down a column of A.

In general, computing A' * B requires O(n^3) arithmetic operations and
O(n^2) storage accesses.  These same asymptotic costs apply for sparse
matrices (if sparsity is constant) as well as for most parallel
implementations.  Since arithmetic operations are SOOO much faster than
moving data over the network, it behooves us to arrange our computation so
that we re-use data as much as possible.  If we don't do this, then we wind
up with O(n^3) storage accesses and our program will be hundreds to
thousands of times slower than we might otherwise like.

The simplest distributed algorithm for A' * A simply explodes the product by
sending partial sums with a key corresponding to the location in the product
to which they would contribute.  These partial products are then summed by
the reducers with one reduce call per element in the result.   This requires
that each column of A be examined and all non-zero products from that column
be sent out.  This works reasonably well and is pretty easy to code.  One
common requirement is that rows of the result be normalized to sum to one
(for probabilistic estimates) or be reduced by a statistical test.  The
rough pseudo-code for the map step in the  basic multiplication without the
fancy extra steps is this:

    {
      j, column, out, reporter ->
        column.foreachNonZero {
           i1, x1 ->
                column.foreachNonZero {
                     i2, x2 ->
                        out.collect([i1, i2], x1*x2)
                }
         }
     }

This mapper is applied in parallel to every column of A (=row of A').

Reduce and combine are very simple and just add up the results:

    { key, values, out, reporter ->
        out.collect(key, values.sum())
    }

This algorithm is the ultimate in block decomposition because the blocks are
as small as possible (1x1).  Usually with matrix multiplication, though, it
helps to use larger sub-blocks so that we can re-use some of the operands
while they are still in registers.  This helps us avoid some of O(n^3) data
traffic that we can incur otherwise.  Also, small records tend to incur more
data transfer overhead than larger ones.  The larger blocks need not be
square, but can as well be tall and skinner.

One potential algorithm to limit data communication and at the same time
generalize the code above to A' * B is to pass A to all mappers and then map
over columns of B.  The mapper then becomes:

    {
       i, columnOfB, out, reporter ->
            out.collect(i, A' * columnOfB)
    }  

The output of each map is a row of the final output so the only thing the
reducer needs to do is concatenate rows.

There are other block decompositions possible, of course.

(Grant, the Groovy notation here is just for YOU)

On 3/4/08 4:33 PM, "Jeff Eastman" <[EMAIL PROTECTED]> wrote:

> The current implementation is obviously not distributed, and operations
> only apply to the in-memory subsets of what may or may not be larger
> matrix/vector computations. These can still be quite large, of course.
> To go beyond this simple view requires a more global representation and
> something like BigTable or Hbase support.
> 
> This matrix implementation, for example, would be quite sufficient to do
> 4x4 graphics transformations of billions of image points, but not to
> invert a billion element matrix. In the first example, each mapper would
> transform a subset of the points in parallel. In the second example,
> there would be significant inter-mapper communication required. I bet
> Ted knows how to approach that one :)
> 
> Jeff
> 
> -----Original Message-----
> From: Grant Ingersoll [mailto:[EMAIL PROTECTED]
> Sent: Tuesday, March 04, 2008 4:16 PM
> To: mahout-dev@lucene.apache.org
> Subject: Re: [jira] Updated: (MAHOUT-6) Need a matrix implementation
> 
> One question I have is how you see this being distributed?  Does each
> Mapper/Reducer create it's own Matrix based on some subset of the data
> it is working on?  Or is there a notion of what I gather BigTable does
> (I haven't used BigTable) and you have the notion of a whole Matrix?
> I realize this is not implemented, just wondering your thoughts on how
> all this works in a distributed sense.
> 
> -Grant
> 
> On Mar 3, 2008, at 4:14 PM, Jeff Eastman (JIRA) wrote:
> 
>> 
>>     [
> https://issues.apache.org/jira/browse/MAHOUT-6?page=com.atlassian.jira.p
> lugin.system.issuetabpanels:all-tabpanel
>>  ]
>> 
>> Jeff Eastman updated MAHOUT-6:
>> ------------------------------
>> 
>>    Attachment: MAHOUT-6j.diff
>> 
>> Sorted out the two patches and added back my Vector unit tests that
>> fell out due to an oversight. Couldn't resist adding a cross()
>> operation on vectors but I'm going to do something else for a while
>> to let people review this and let the dust settle.  Once we get
>> vectors into trunk I will update the clustering code to use them.
>> 
>>> Need a matrix implementation
>>> ----------------------------
>>> 
>>>                Key: MAHOUT-6
>>>                URL: https://issues.apache.org/jira/browse/MAHOUT-6
>>>            Project: Mahout
>>>         Issue Type: New Feature
>>>           Reporter: Ted Dunning
>>>        Attachments: MAHOUT-6a.diff, MAHOUT-6b.diff, MAHOUT-6c.diff,
>>> MAHOUT-6d.diff, MAHOUT-6e.diff, MAHOUT-6f.diff, MAHOUT-6g.diff,
>>> MAHOUT-6h.patch, MAHOUT-6i.diff, MAHOUT-6j.diff
>>> 
>>> 
>>> We need matrices for Mahout.
>>> An initial set of basic requirements includes:
>>> a) sparse and dense support are required
>>> b) row and column labels are important
>>> c) serialization for hadoop use is required
>>> d) reasonable floating point performance is required, but awesome
>>> FP is not
>>> e) the API should be simple enough to understand
>>> f) it should be easy to carve out sub-matrices for sending to
>>> different reducers
>>> g) a reasonable set of matrix operations should be supported, these
>>> should eventually include:
>>>    simple matrix-matrix and matrix-vector and matrix-scalar linear
>>> algebra operations, A B, A + B, A v, A + x, v + x, u + v, dot(u, v)
>>>    row and column sums
>>>    generalized level 2 and 3 BLAS primitives, alpha A B + beta C
>>> and A u + beta v
>>> h) easy and efficient iteration constructs, especially for sparse
>>> matrices
>>> i) easy to extend with new implementations
>> 
>> -- 
>> This message is automatically generated by JIRA.
>> -
>> You can reply to this email to add a comment to the issue online.
>> 
> 

Reply via email to