You can use def partitionBy(partitioner: Partitioner): RDD[(K, V)] Return a copy of the RDD partitioned using the specified partitioner
The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is something which adds valuable functionality to spark e.g. the point lookups PROVIDED it can be executed from within function running on worker executors Can somebody from DataBricks sched more light here -----Original Message----- From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Thursday, April 16, 2015 9:39 PM To: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Evo > partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -----Original Message----- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be "sequential" it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -----Original Message----- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) <ningjun.w...@lexisnexis.com> wrote: > Does anybody have a solution for this? > > > > > > From: Wang, Ningjun (LNG-NPV) > Sent: Tuesday, April 14, 2015 10:41 AM > To: user@spark.apache.org > Subject: How to join RDD keyValuePairs efficiently > > > > I have an RDD that contains millions of Document objects. Each > document has an unique Id that is a string. I need to find the documents by > ids quickly. > Currently I used RDD join as follow > > > > First I save the RDD as object file > > > > allDocs : RDD[Document] = getDocs() // this RDD contains 7 million > Document objects > > allDocs.saveAsObjectFile(“/temp/allDocs.obj”) > > > > Then I wrote a function to find documents by Ids > > > > def findDocumentsByIds(docids: RDD[String]) = { > > // docids contains less than 100 item > > val allDocs : RDD[Document] =sc.objectFile[Document]( > (“/temp/allDocs.obj”) > > val idAndDocs = allDocs.keyBy(d => dv.id) > > docids.map(id => (id,id)).join(idAndDocs).map(t => t._2._2) > > } > > > > I found that this is very slow. I suspect it scan the entire 7 million > Document objects in “/temp/allDocs.obj” sequentially to find the > desired document. > > > > Is there any efficient way to do this? > > > > One option I am thinking is that instead of storing the RDD[Document] > as object file, I store each document in a separate file with filename > equal to the docid. This way I can find a document quickly by docid. > However this means I need to save the RDD to 7 million small file > which will take a very long time to save and may cause IO problems with so > many small files. > > > > Is there any other way? > > > > > > > > Ningjun --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ---------------------------------------------- T ususcib, -mil uerunubcrbesprkapch.og Fo adiioalcomads emal:usr...@sar.aace.rg --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org