David, a PPRDD (mobile abbrev) is a new RDD that contains a subset of
partitions of the original RDD. Subsequent transformations/operations will
only see this subset. So yes, it may do what you need, or not, depending on
whether you still need to do something with the other partitions, as
implied by Mark. You can of course still refer to the original RDD, or
create yet another PPRDD containing that other subset, etc., just as you
can call sc.runJob() on different partitions each time.

I'm sure you can decide which pattern best fits your use case. Beware of
over optimizing leading to unnecessary complexity, though I've also learned
not to underestimate others' real needs based on their toy examples.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 3:41 PM, "David Thomas" <dt5434...@gmail.com> wrote:

> Thanks for those tips.
>
> I was looking into the docs for PartitionPruningRDD. It says, "A RDD used
> to prune RDD partitions/partitions so we can avoid launching tasks on all
> partitions". I did not understand this exactly and I couldn't find any
> sample code. Can we use this to apply a function only on certain partitions?
>
>
> On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <c...@adatao.com>wrote:
>
>> Hence the qualification to determine whether it is necessary *and*
>> sufficient, depending on what David is trying to do overall :)
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Jan 28, 2014 2:10 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote:
>>
>>> SparkContext#runJob is the basis of an RDD action, so the result of
>>> using runJob to call toUpperCase on the A-to-M partitions will be the
>>> uppercased strings materialized in the driver process, not a transformation
>>> of the original RDD.
>>>
>>>
>>> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <c...@adatao.com>wrote:
>>>
>>>> David,
>>>>
>>>> map() would iterate row by row, forcing an if on each row.
>>>>
>>>> mapPartitions*() allows you to have a conditional on the whole
>>>> partition first, as Mark suggests. That should usually be sufficient.
>>>>
>>>> SparkContext.runJob() allows you to specify which partitions to run on,
>>>> if you're sure it's necessary and sufficient, and not over optimization.
>>>>
>>>> Sent while mobile. Pls excuse typos etc.
>>>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <m...@clearstorydata.com>
>>>> wrote:
>>>>
>>>>> If I'm understanding you correctly, there's lots of ways you could do
>>>>> that.  Here's one, continuing from the previous example:
>>>>>
>>>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>>>
>>>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>>>
>>>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>>>> itr.map(_.toUpperCase) else itr }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434...@gmail.com>wrote:
>>>>>
>>>>>> Thank you! That helps.
>>>>>>
>>>>>> A follow up question on this. How can I apply a function only on a
>>>>>> subset of this RDD. Lets say, I need all strings starting in the range 
>>>>>> 'A'
>>>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that 
>>>>>> possible
>>>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <
>>>>>> m...@clearstorydata.com> wrote:
>>>>>>
>>>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>>>
>>>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>>>
>>>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at
>>>>>>> keyBy at <console>:15
>>>>>>>
>>>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>>>> res0)).values
>>>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>>>> <console>:18
>>>>>>>
>>>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>>>>>> s))).collect.foreach(println)
>>>>>>>
>>>>>>> (0,apple)
>>>>>>> (1,Ball)
>>>>>>> (2,cat)
>>>>>>> (3,dog)
>>>>>>> (4,Elephant)
>>>>>>> (5,fox)
>>>>>>> (6,gas)
>>>>>>> (7,horse)
>>>>>>> (8,index)
>>>>>>> (9,jet)
>>>>>>> (10,kitsch)
>>>>>>> (11,long)
>>>>>>> (12,moon)
>>>>>>> (13,Neptune)
>>>>>>> (14,ooze)
>>>>>>> (15,Pen)
>>>>>>> (16,quiet)
>>>>>>> (17,rose)
>>>>>>> (18,sun)
>>>>>>> (19,talk)
>>>>>>> (20,umbrella)
>>>>>>> (21,voice)
>>>>>>> (22,Walrus)
>>>>>>> (23,xeon)
>>>>>>> (24,Yam)
>>>>>>> (25,zebra)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>>
>>>>>>>> If you do something like:
>>>>>>>>
>>>>>>>> rdd.map{ str => (str.take(1), str) }
>>>>>>>>
>>>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>>>> character of the string. Now when you perform an operation that uses
>>>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce 
>>>>>>>> task
>>>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. 
>>>>>>>> Note
>>>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>>>> behaviour.
>>>>>>>>
>>>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>>>> assistance.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas 
>>>>>>>> <dt5434...@gmail.com>wrote:
>>>>>>>>
>>>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to