This is perfect. So I guess my best course of action will be to create a custom partitioner to assure that the smallest amount of data is shuffled when I join the partitions, and then I really only need to do a map (rather than a mapPartitions) since the inverted index object will be pointed to (rather than copied for each value as I had assumed).
Thank you Ted and Koert! On Sat, Jan 16, 2016 at 1:37 PM Ted Yu <yuzhih...@gmail.com> wrote: > Both groupByKey and join() accept Partitioner as parameter. > > Maybe you can specify a custom Partitioner so that the amount of shuffle > is reduced. > > On Sat, Jan 16, 2016 at 9:39 AM, Daniel Imberman < > daniel.imber...@gmail.com> wrote: > >> Hi Ted, >> >> I think I might have figured something out!(Though I haven't tested it at >> scale yet) >> >> My current thought is that I can do a groupByKey on the RDD of vectors >> and then do a join with the invertedIndex. >> It would look something like this: >> >> val InvIndexes:RDD[(Int,InvertedIndex)] >> val partitionedVectors:RDD[(Int, Vector)] >> >> val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] = >> partitionedvectors.groupByKey().join(invIndexes) >> >> val similarities = partitionedTasks.map(//calculate similarities) >> val maxSim = similarities.reduce(math.max) >> >> >> So while I realize that usually a groupByKey is usually frowned upon, it >> seems to me that since I need all associated vectors to be local anyways >> that this repartitioning would not be too expensive. >> >> Does this seem like a reasonable approach to this problem or are there >> any faults that I should consider should I approach it this way? >> >> Thank you for your help, >> >> Daniel >> >> On Fri, Jan 15, 2016 at 5:30 PM Ted Yu <yuzhih...@gmail.com> wrote: >> >>> My knowledge of XSEDE is limited - I visited the website. >>> >>> If there is no easy way to deploy HBase, alternative approach (using >>> hdfs ?) needs to be considered. >>> >>> I need to do more homework on this :-) >>> >>> On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman < >>> daniel.imber...@gmail.com> wrote: >>> >>>> Hi Ted, >>>> >>>> So unfortunately after looking into the cluster manager that I will be >>>> using for my testing (I'm using a super-computer called XSEDE rather than >>>> AWS), it looks like the cluster does not actually come with Hbase installed >>>> (this cluster is becoming somewhat problematic, as it is essentially AWS >>>> but you have to do your own virtualization scripts). Do you have any other >>>> thoughts on how I could go about dealing with this purely using spark and >>>> HDFS? >>>> >>>> Thank you >>>> >>>> On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman < >>>> daniel.imber...@gmail.com> wrote: >>>> >>>>> Thank you Ted! That sounds like it would probably be the most >>>>> efficient (with the least overhead) way of handling this situation. >>>>> >>>>> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Another approach is to store the objects in NoSQL store such as HBase. >>>>>> >>>>>> Looking up object should be very fast. >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman < >>>>>> daniel.imber...@gmail.com> wrote: >>>>>> >>>>>>> I'm looking for a way to send structures to pre-determined >>>>>>> partitions so that >>>>>>> they can be used by another RDD in a mapPartition. >>>>>>> >>>>>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted >>>>>>> indexes. The inverted index objects are quite large. >>>>>>> >>>>>>> My hope is to do a MapPartitions within the RDD of vectors where I >>>>>>> can >>>>>>> compare each vector to the inverted index. The issue is that I only >>>>>>> NEED one >>>>>>> inverted index object per partition (which would have the same key >>>>>>> as the >>>>>>> values within that partition). >>>>>>> >>>>>>> >>>>>>> val vectors:RDD[(Int, SparseVector)] >>>>>>> >>>>>>> val invertedIndexes:RDD[(Int, InvIndex)] = >>>>>>> a.reduceByKey(generateInvertedIndex) >>>>>>> vectors:RDD.mapPartitions{ >>>>>>> iter => >>>>>>> val invIndex = invertedIndexes(samePartitionKey) >>>>>>> iter.map(invIndex.calculateSimilarity(_)) >>>>>>> ) >>>>>>> } >>>>>>> >>>>>>> How could I go about setting up the Partition such that the specific >>>>>>> data >>>>>>> structure I need will be present for the mapPartition but I won't >>>>>>> have the >>>>>>> extra overhead of sending over all values (which would happen if I >>>>>>> were to >>>>>>> make a broadcast variable). >>>>>>> >>>>>>> One thought I have been having is to store the objects in HDFS but >>>>>>> I'm not >>>>>>> sure if that would be a suboptimal solution (It seems like it could >>>>>>> slow >>>>>>> down the process a lot) >>>>>>> >>>>>>> Another thought I am currently exploring is whether there is some >>>>>>> way I can >>>>>>> create a custom Partition or Partitioner that could hold the data >>>>>>> structure >>>>>>> (Although that might get too complicated and become problematic) >>>>>>> >>>>>>> Any thoughts on how I could attack this issue would be highly >>>>>>> appreciated. >>>>>>> >>>>>>> thank you for your help! >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>> >