Hi,
I have an idea that I would like to discuss with the Spark devs. The
idea comes from a very real problem that I have struggled with since
almost a year. My problem is very simple, it's a dense matrix * sparse
matrix operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
divided in X large blocks (one block per partition), and a sparse matrix
RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
most efficient way to perform the operation is to collectAsMap() the
dense matrix and broadcast it, then perform the block-local
mutliplications, and combine the results by column.
This is quite fine, unless the matrix is too big to fit in memory
(especially since the multiplication is performed several times
iteratively, and the broadcasts are not always cleaned from memory as I
would naively expect).
When the dense matrix is too big, a second solution is to split the big
sparse matrix in several RDD, and do several broadcasts. Doing this
creates quite a big overhead, but it mostly works, even though I often
face some problems with unaccessible broadcast files, for instance.
Then there is the terrible but apparently very effective good old join.
Since X blocks of the sparse matrix use the same block from the dense
matrix, I suspect that the dense matrix is somehow replicated X times
(either on disk or in the network), which is the reason why the join
takes so much time.
After this bit of a context, here is my idea : would it be possible to
somehow "broadcast" (or maybe more accurately, share or serve) a
persisted RDD which is distributed on all workers, in a way that would,
a bit like the IndexedRDD, allow a task to access a partition or an
element of a partition in the closure, with a worker-local memory cache
. i.e. the information about where each block resides would be
distributed on the workers, to allow them to access parts of the RDD
directly. I think that's already a bit how RDD are shuffled ?
The RDD could stay distributed (no need to collect then broadcast), and
only necessary transfers would be required.
Is this a bad idea, is it already implemented somewhere (I would love it
!) ?or is it something that could add efficiency not only for my use
case, but maybe for others ? Could someone give me some hint about how I
could add this possibility to Spark ? I would probably try to extend a
RDD into a specific SharedIndexedRDD with a special lookup that would be
allowed from tasks as a special case, and that would try to contact the
blockManager and reach the corresponding data from the right worker.
Thanks in advance for your advices
Guillaume
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705