Hi,

I have an idea that I would like to discuss with the Spark devs. The idea comes from a very real problem that I have struggled with since almost a year. My problem is very simple, it's a dense matrix * sparse matrix operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is divided in X large blocks (one block per partition), and a sparse matrix RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The most efficient way to perform the operation is to collectAsMap() the dense matrix and broadcast it, then perform the block-local mutliplications, and combine the results by column.

This is quite fine, unless the matrix is too big to fit in memory (especially since the multiplication is performed several times iteratively, and the broadcasts are not always cleaned from memory as I would naively expect).

When the dense matrix is too big, a second solution is to split the big sparse matrix in several RDD, and do several broadcasts. Doing this creates quite a big overhead, but it mostly works, even though I often face some problems with unaccessible broadcast files, for instance.

Then there is the terrible but apparently very effective good old join. Since X blocks of the sparse matrix use the same block from the dense matrix, I suspect that the dense matrix is somehow replicated X times (either on disk or in the network), which is the reason why the join takes so much time.

After this bit of a context, here is my idea : would it be possible to somehow "broadcast" (or maybe more accurately, share or serve) a persisted RDD which is distributed on all workers, in a way that would, a bit like the IndexedRDD, allow a task to access a partition or an element of a partition in the closure, with a worker-local memory cache . i.e. the information about where each block resides would be distributed on the workers, to allow them to access parts of the RDD directly. I think that's already a bit how RDD are shuffled ?

The RDD could stay distributed (no need to collect then broadcast), and only necessary transfers would be required.

Is this a bad idea, is it already implemented somewhere (I would love it !) ?or is it something that could add efficiency not only for my use case, but maybe for others ? Could someone give me some hint about how I could add this possibility to Spark ? I would probably try to extend a RDD into a specific SharedIndexedRDD with a special lookup that would be allowed from tasks as a special case, and that would try to contact the blockManager and reach the corresponding data from the right worker.

Thanks in advance for your advices

Guillaume
--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to