Doesn't avoid an 'if' on every partition, but does avoid it on every element of every partition.
On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <m...@clearstorydata.com>wrote: > If I'm understanding you correctly, there's lots of ways you could do > that. Here's one, continuing from the previous example: > > // rdd26: RDD[String] split by first letter into 26 partitions > > val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65) > > rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) > itr.map(_.toUpperCase) else itr } > > > > > On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434...@gmail.com>wrote: > >> Thank you! That helps. >> >> A follow up question on this. How can I apply a function only on a subset >> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' >> be applied toUpperCase and not touch the remaining. Is that possible >> without running an 'if' condition on all the partitions in the cluster? >> >> >> >> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <m...@clearstorydata.com>wrote: >> >>> scala> import org.apache.spark.RangePartitioner >>> >>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", >>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", >>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", >>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra")) >>> >>> scala> rdd.keyBy(s => s(0).toUpper) >>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy >>> at <console>:15 >>> >>> scala> res0.partitionBy(new RangePartitioner[Char, String](26, >>> res0)).values >>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at >>> <console>:18 >>> >>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, >>> s))).collect.foreach(println) >>> >>> (0,apple) >>> (1,Ball) >>> (2,cat) >>> (3,dog) >>> (4,Elephant) >>> (5,fox) >>> (6,gas) >>> (7,horse) >>> (8,index) >>> (9,jet) >>> (10,kitsch) >>> (11,long) >>> (12,moon) >>> (13,Neptune) >>> (14,ooze) >>> (15,Pen) >>> (16,quiet) >>> (17,rose) >>> (18,sun) >>> (19,talk) >>> (20,umbrella) >>> (21,voice) >>> (22,Walrus) >>> (23,xeon) >>> (24,Yam) >>> (25,zebra) >>> >>> >>> >>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath < >>> nick.pentre...@gmail.com> wrote: >>> >>>> If you do something like: >>>> >>>> rdd.map{ str => (str.take(1), str) } >>>> >>>> you will have an RDD[(String, String)] where the key is the first >>>> character of the string. Now when you perform an operation that uses >>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task >>>> receiving all the strings with A, the 2nd all the strings with B etc. Note >>>> that you may not be able to enforce that each *machine* gets a >>>> different letter, but in most cases that doesn't particularly matter as >>>> long as you get "all values for a given key go to the same reducer" >>>> behaviour. >>>> >>>> Perhaps if you expand on your use case we can provide more detailed >>>> assistance. >>>> >>>> >>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <dt5434...@gmail.com>wrote: >>>> >>>>> Lets say I have an RDD of Strings and there are 26 machines in the >>>>> cluster. How can I repartition the RDD in such a way that all strings >>>>> starting with A gets collected on machine1, B on machine2 and so on. >>>>> >>>>> >>>> >>> >> >