Thanks for those tips. I was looking into the docs for PartitionPruningRDD. It says, "A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on all partitions". I did not understand this exactly and I couldn't find any sample code. Can we use this to apply a function only on certain partitions?
On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen <c...@adatao.com> wrote: > Hence the qualification to determine whether it is necessary *and* > sufficient, depending on what David is trying to do overall :) > > Sent while mobile. Pls excuse typos etc. > On Jan 28, 2014 2:10 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote: > >> SparkContext#runJob is the basis of an RDD action, so the result of using >> runJob to call toUpperCase on the A-to-M partitions will be the uppercased >> strings materialized in the driver process, not a transformation of the >> original RDD. >> >> >> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <c...@adatao.com>wrote: >> >>> David, >>> >>> map() would iterate row by row, forcing an if on each row. >>> >>> mapPartitions*() allows you to have a conditional on the whole partition >>> first, as Mark suggests. That should usually be sufficient. >>> >>> SparkContext.runJob() allows you to specify which partitions to run on, >>> if you're sure it's necessary and sufficient, and not over optimization. >>> >>> Sent while mobile. Pls excuse typos etc. >>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <m...@clearstorydata.com> wrote: >>> >>>> If I'm understanding you correctly, there's lots of ways you could do >>>> that. Here's one, continuing from the previous example: >>>> >>>> // rdd26: RDD[String] split by first letter into 26 partitions >>>> >>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65) >>>> >>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) >>>> itr.map(_.toUpperCase) else itr } >>>> >>>> >>>> >>>> >>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434...@gmail.com>wrote: >>>> >>>>> Thank you! That helps. >>>>> >>>>> A follow up question on this. How can I apply a function only on a >>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A' >>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible >>>>> without running an 'if' condition on all the partitions in the cluster? >>>>> >>>>> >>>>> >>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <m...@clearstorydata.com >>>>> > wrote: >>>>> >>>>>> scala> import org.apache.spark.RangePartitioner >>>>>> >>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", >>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", >>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", >>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra")) >>>>>> >>>>>> scala> rdd.keyBy(s => s(0).toUpper) >>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at >>>>>> keyBy at <console>:15 >>>>>> >>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26, >>>>>> res0)).values >>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at >>>>>> <console>:18 >>>>>> >>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, >>>>>> s))).collect.foreach(println) >>>>>> >>>>>> (0,apple) >>>>>> (1,Ball) >>>>>> (2,cat) >>>>>> (3,dog) >>>>>> (4,Elephant) >>>>>> (5,fox) >>>>>> (6,gas) >>>>>> (7,horse) >>>>>> (8,index) >>>>>> (9,jet) >>>>>> (10,kitsch) >>>>>> (11,long) >>>>>> (12,moon) >>>>>> (13,Neptune) >>>>>> (14,ooze) >>>>>> (15,Pen) >>>>>> (16,quiet) >>>>>> (17,rose) >>>>>> (18,sun) >>>>>> (19,talk) >>>>>> (20,umbrella) >>>>>> (21,voice) >>>>>> (22,Walrus) >>>>>> (23,xeon) >>>>>> (24,Yam) >>>>>> (25,zebra) >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath < >>>>>> nick.pentre...@gmail.com> wrote: >>>>>> >>>>>>> If you do something like: >>>>>>> >>>>>>> rdd.map{ str => (str.take(1), str) } >>>>>>> >>>>>>> you will have an RDD[(String, String)] where the key is the first >>>>>>> character of the string. Now when you perform an operation that uses >>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task >>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. >>>>>>> Note >>>>>>> that you may not be able to enforce that each *machine* gets a >>>>>>> different letter, but in most cases that doesn't particularly matter as >>>>>>> long as you get "all values for a given key go to the same reducer" >>>>>>> behaviour. >>>>>>> >>>>>>> Perhaps if you expand on your use case we can provide more detailed >>>>>>> assistance. >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas >>>>>>> <dt5434...@gmail.com>wrote: >>>>>>> >>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the >>>>>>>> cluster. How can I repartition the RDD in such a way that all strings >>>>>>>> starting with A gets collected on machine1, B on machine2 and so on. >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>