This is perfect. So I guess my best course of action will be to create a
custom partitioner to assure that the smallest amount of data is shuffled
when I join the partitions, and then I really only need to do a map (rather
than a mapPartitions) since the inverted index object will be pointed to
(rather than copied for each value as I had assumed).

Thank you Ted and Koert!

On Sat, Jan 16, 2016 at 1:37 PM Ted Yu <yuzhih...@gmail.com> wrote:

> Both groupByKey and join() accept Partitioner as parameter.
>
> Maybe you can specify a custom Partitioner so that the amount of shuffle
> is reduced.
>
> On Sat, Jan 16, 2016 at 9:39 AM, Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> Hi Ted,
>>
>> I think I might have figured something out!(Though I haven't tested it at
>> scale yet)
>>
>> My current thought is that I can do a groupByKey on the RDD of vectors
>> and then do a join with the invertedIndex.
>> It would look something like this:
>>
>> val InvIndexes:RDD[(Int,InvertedIndex)]
>> val partitionedVectors:RDD[(Int, Vector)]
>>
>> val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] =
>> partitionedvectors.groupByKey().join(invIndexes)
>>
>> val similarities = partitionedTasks.map(//calculate similarities)
>> val maxSim = similarities.reduce(math.max)
>>
>>
>> So while I realize that usually a groupByKey is usually frowned upon, it
>> seems to me that since I need all associated vectors to be local anyways
>> that this repartitioning would not be too expensive.
>>
>> Does this seem like a reasonable approach to this problem or are there
>> any faults that I should consider should I approach it this way?
>>
>> Thank you for your help,
>>
>> Daniel
>>
>> On Fri, Jan 15, 2016 at 5:30 PM Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> My knowledge of XSEDE is limited - I visited the website.
>>>
>>> If there is no easy way to deploy HBase, alternative approach (using
>>> hdfs ?) needs to be considered.
>>>
>>> I need to do more homework on this :-)
>>>
>>> On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman <
>>> daniel.imber...@gmail.com> wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> So unfortunately after looking into the cluster manager that I will be
>>>> using for my testing (I'm using a super-computer called XSEDE rather than
>>>> AWS), it looks like the cluster does not actually come with Hbase installed
>>>> (this cluster is becoming somewhat problematic, as it is essentially AWS
>>>> but you have to do your own virtualization scripts). Do you have any other
>>>> thoughts on how I could go about dealing with this purely using spark and
>>>> HDFS?
>>>>
>>>> Thank you
>>>>
>>>> On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman <
>>>> daniel.imber...@gmail.com> wrote:
>>>>
>>>>> Thank you Ted! That sounds like it would probably be the most
>>>>> efficient (with the least overhead) way of handling this situation.
>>>>>
>>>>> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Another approach is to store the objects in NoSQL store such as HBase.
>>>>>>
>>>>>> Looking up object should be very fast.
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>>>>>> daniel.imber...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm looking for a way to send structures to pre-determined
>>>>>>> partitions so that
>>>>>>> they can be used by another RDD in a mapPartition.
>>>>>>>
>>>>>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>>>>>>> indexes. The inverted index objects are quite large.
>>>>>>>
>>>>>>> My hope is to do a MapPartitions within the RDD of vectors where I
>>>>>>> can
>>>>>>> compare each vector to the inverted index. The issue is that I only
>>>>>>> NEED one
>>>>>>> inverted index object per partition (which would have the same key
>>>>>>> as the
>>>>>>> values within that partition).
>>>>>>>
>>>>>>>
>>>>>>> val vectors:RDD[(Int, SparseVector)]
>>>>>>>
>>>>>>> val invertedIndexes:RDD[(Int, InvIndex)] =
>>>>>>> a.reduceByKey(generateInvertedIndex)
>>>>>>> vectors:RDD.mapPartitions{
>>>>>>>     iter =>
>>>>>>>          val invIndex = invertedIndexes(samePartitionKey)
>>>>>>>          iter.map(invIndex.calculateSimilarity(_))
>>>>>>>          )
>>>>>>> }
>>>>>>>
>>>>>>> How could I go about setting up the Partition such that the specific
>>>>>>> data
>>>>>>> structure I need will be present for the mapPartition but I won't
>>>>>>> have the
>>>>>>> extra overhead of sending over all values (which would happen if I
>>>>>>> were to
>>>>>>> make a broadcast variable).
>>>>>>>
>>>>>>> One thought I have been having is to store the objects in HDFS but
>>>>>>> I'm not
>>>>>>> sure if that would be a suboptimal solution (It seems like it could
>>>>>>> slow
>>>>>>> down the process a lot)
>>>>>>>
>>>>>>> Another thought I am currently exploring is whether there is some
>>>>>>> way I can
>>>>>>> create a custom Partition or Partitioner that could hold the data
>>>>>>> structure
>>>>>>> (Although that might get too complicated and become problematic)
>>>>>>>
>>>>>>> Any thoughts on how I could attack this issue would be highly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> thank you for your help!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>
>

Reply via email to