You can use

def  partitionBy(partitioner: Partitioner): RDD[(K, V)] 
Return a copy of the RDD partitioned using the specified partitioner

The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is 
something which adds valuable functionality to spark e.g. the point lookups 
PROVIDED it can be executed from within function running on worker executors 

Can somebody from DataBricks sched more light here  

-----Original Message-----
From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] 
Sent: Thursday, April 16, 2015 9:39 PM
To: user@spark.apache.org
Subject: RE: How to join RDD keyValuePairs efficiently

Evo

        > partition the large doc RDD based on the hash function on the key ie 
the docid

What API to use to do this?

By the way, loading the entire dataset to memory cause OutOfMemory problem 
because it is too large (I only have one machine with 16GB and 4 cores).

I found something called IndexedRDD on the web 
https://github.com/amplab/spark-indexedrdd

Has anybody use it?

Ningjun

-----Original Message-----
From: Evo Eftimov [mailto:evo.efti...@isecc.com]
Sent: Thursday, April 16, 2015 12:18 PM
To: 'Sean Owen'; Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: RE: How to join RDD keyValuePairs efficiently

Ningjun, to speed up your current design you can do the following:

1.partition the large doc RDD based on the hash function on the key ie the docid

2. persist the large dataset in memory to be available for subsequent queries 
without reloading and repartitioning for every search query 

3. partition the small doc dataset in the same way - this will result in 
collocated small and large RDD partitions with the same key

4. run the join - the match is not going to be "sequential" it is based on hash 
of the key moreover RDD elements with the same key will be collocated on the 
same cluster node


OR simply go for Sean suggestion - under the hood it works in a slightly 
different way - the filter is executed in mappers running in parallel on every 
node and also by passing the small doc IDs to each filter (mapper) you 
essentially replicate them on every node so each mapper instance has its own 
copy and runs with it when filtering 

And finally you can prototype both options described above and measure and 
compare their performance   

-----Original Message-----
From: Sean Owen [mailto:so...@cloudera.com]
Sent: Thursday, April 16, 2015 5:02 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: How to join RDD keyValuePairs efficiently

This would be much, much faster if your set of IDs was simply a Set, and you 
passed that to a filter() call that just filtered in the docs that matched an 
ID in the set.

On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) 
<ningjun.w...@lexisnexis.com> wrote:
> Does anybody have a solution for this?
>
>
>
>
>
> From: Wang, Ningjun (LNG-NPV)
> Sent: Tuesday, April 14, 2015 10:41 AM
> To: user@spark.apache.org
> Subject: How to join RDD keyValuePairs efficiently
>
>
>
> I have an RDD that contains millions of Document objects. Each 
> document has an unique Id that is a string. I need to find the documents by 
> ids quickly.
> Currently I used RDD join as follow
>
>
>
> First I save the RDD as object file
>
>
>
> allDocs : RDD[Document] = getDocs()  // this RDD contains 7 million 
> Document objects
>
> allDocs.saveAsObjectFile(“/temp/allDocs.obj”)
>
>
>
> Then I wrote a function to find documents by Ids
>
>
>
> def findDocumentsByIds(docids: RDD[String]) = {
>
> // docids contains less than 100 item
>
> val allDocs : RDD[Document] =sc.objectFile[Document](
> (“/temp/allDocs.obj”)
>
> val idAndDocs = allDocs.keyBy(d => dv.id)
>
>     docids.map(id => (id,id)).join(idAndDocs).map(t => t._2._2)
>
> }
>
>
>
> I found that this is very slow. I suspect it scan the entire 7 million 
> Document objects in “/temp/allDocs.obj” sequentially to find the 
> desired document.
>
>
>
> Is there any efficient way to do this?
>
>
>
> One option I am thinking is that instead of storing the RDD[Document] 
> as object file, I store each document in a separate file with filename 
> equal to the docid. This way I can find a document quickly by docid.
> However this means I need to save the RDD to 7 million small file 
> which will take a very long time to save and may cause IO problems with so 
> many small files.
>
>
>
> Is there any other way?
>
>
>
>
>
>
>
> Ningjun

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



----------------------------------------------
T ususcib, -mil uerunubcrbesprkapch.og
Fo adiioalcomads emal:usr...@sar.aace.rg



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to