Thanks for those tips.

I was looking into the docs for PartitionPruningRDD. It says, "A RDD used
to prune RDD partitions/partitions so we can avoid launching tasks on all
partitions". I did not understand this exactly and I couldn't find any
sample code. Can we use this to apply a function only on certain partitions?


On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <c...@adatao.com> wrote:

> Hence the qualification to determine whether it is necessary *and*
> sufficient, depending on what David is trying to do overall :)
>
> Sent while mobile. Pls excuse typos etc.
> On Jan 28, 2014 2:10 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote:
>
>> SparkContext#runJob is the basis of an RDD action, so the result of using
>> runJob to call toUpperCase on the A-to-M partitions will be the uppercased
>> strings materialized in the driver process, not a transformation of the
>> original RDD.
>>
>>
>> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <c...@adatao.com>wrote:
>>
>>> David,
>>>
>>> map() would iterate row by row, forcing an if on each row.
>>>
>>> mapPartitions*() allows you to have a conditional on the whole partition
>>> first, as Mark suggests. That should usually be sufficient.
>>>
>>> SparkContext.runJob() allows you to specify which partitions to run on,
>>> if you're sure it's necessary and sufficient, and not over optimization.
>>>
>>> Sent while mobile. Pls excuse typos etc.
>>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote:
>>>
>>>> If I'm understanding you correctly, there's lots of ways you could do
>>>> that.  Here's one, continuing from the previous example:
>>>>
>>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>>
>>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>>
>>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>>> itr.map(_.toUpperCase) else itr }
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434...@gmail.com>wrote:
>>>>
>>>>> Thank you! That helps.
>>>>>
>>>>> A follow up question on this. How can I apply a function only on a
>>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <m...@clearstorydata.com
>>>>> > wrote:
>>>>>
>>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>>
>>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>>
>>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at
>>>>>> keyBy at <console>:15
>>>>>>
>>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>>> res0)).values
>>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>>> <console>:18
>>>>>>
>>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>>>>> s))).collect.foreach(println)
>>>>>>
>>>>>> (0,apple)
>>>>>> (1,Ball)
>>>>>> (2,cat)
>>>>>> (3,dog)
>>>>>> (4,Elephant)
>>>>>> (5,fox)
>>>>>> (6,gas)
>>>>>> (7,horse)
>>>>>> (8,index)
>>>>>> (9,jet)
>>>>>> (10,kitsch)
>>>>>> (11,long)
>>>>>> (12,moon)
>>>>>> (13,Neptune)
>>>>>> (14,ooze)
>>>>>> (15,Pen)
>>>>>> (16,quiet)
>>>>>> (17,rose)
>>>>>> (18,sun)
>>>>>> (19,talk)
>>>>>> (20,umbrella)
>>>>>> (21,voice)
>>>>>> (22,Walrus)
>>>>>> (23,xeon)
>>>>>> (24,Yam)
>>>>>> (25,zebra)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>
>>>>>>> If you do something like:
>>>>>>>
>>>>>>> rdd.map{ str => (str.take(1), str) }
>>>>>>>
>>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>>> character of the string. Now when you perform an operation that uses
>>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. 
>>>>>>> Note
>>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>>> behaviour.
>>>>>>>
>>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>>> assistance.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas 
>>>>>>> <dt5434...@gmail.com>wrote:
>>>>>>>
>>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>

Reply via email to