Hi Shuai, yup, that is exactly what I meant -- implement your own class MyGroupingRDD. This is definitely more detail than a lot of users will need to go, but its also not all that scary either. In this case, you want something that is *extremely* close to the existing CoalescedRDD, so start by looking at that code.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala The only thing which is complicated in CoalescedRDD is the PartitionCoalescer, but that is completely irrelevant for you, so you can ignore it. I started writing up a description of what to do but then I realized just writing the code would be easier :) Totally untested, but here you go: https://gist.github.com/squito/c2d1dd5413a60830d6f3 The only really interesting part here is getPartitions: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31 That's where you create partitions in your new RDD, which depend on multiple RDDs from the parent. Also note that compute() is very simple: you just concatenate together the iterators from each of the parent RDDs: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37 let me know how it goes! On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <szheng.c...@gmail.com> wrote: > Hi Imran, > > > > I am a bit confused here. Assume I have RDD a with 1000 partition and also > has been sorted. How can I control when creating RDD b (with 20 partitions) > to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I > don’t see any control code/logic here? > > > > You code below: > > > > val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) > > > > > > Does it means I need to define/develop my own MyGroupingRDD class? I am > not very clear how to do that, any place I can find an example? I never > create my own RDD class before (not RDD instance J). But this is very > valuable approach to me so I am desired to learn. > > > > Regards, > > > > Shuai > > > > *From:* Imran Rashid [mailto:iras...@cloudera.com] > *Sent:* Monday, March 16, 2015 11:22 AM > *To:* Shawn Zheng; user@spark.apache.org > *Subject:* Re: Process time series RDD after sortByKey > > > > Hi Shuai, > > > > On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.c...@gmail.com> > wrote: > > Sorry I response late. > > Zhan Zhang's solution is very interesting and I look at into it, but it is > not what I want. Basically I want to run the job sequentially and also gain > parallelism. So if possible, if I have 1000 partition, the best case is I > can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, > etc. > > If we have ability to do this, we will gain huge flexibility when we try > to process some time series like data and a lot of algo will benefit from > it. > > > > yes, this is what I was suggesting you do. You would first create one RDD > (a) that has 1000 partitions. Don't worry about the creation of this RDD > -- it wont' create any tasks, its just a logical holder of your raw data. > Then you create another RDD (b) that depends on your RDD (a), but that only > has 20 partitions. Each partition in (b) would depend on a number of > partitions from (a). As you've suggested, partition 1 in (b) would depend > on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in > (a), etc. Note that RDD (b) still doesn't *do* anything. Its just > another logical holder for your data, but this time grouped in the way you > want. Then after RDD (b), you would do whatever other transformations you > wanted, but now you'd be working w/ 20 partitions: > > > > val rawData1000Partitions = sc.textFile(...) // or whatever > > val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) > > groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. > > > > note that this is almost exactly the same as what CoalescedRdd does. > However, it might combine the partitions in whatever ways it feels like -- > you want them combined in a very particular order. So you'll need to > create your own subclass. > > > > > > Back to Zhan Zhang's > > while( iterPartition < RDD.partitions.length) { > > val res = sc.runJob(this, (it: Iterator[T]) => > somFunc, iterPartition, allowLocal = true) > > Some other function after processing one partition. > > iterPartition += 1 > > } > > I am curious how spark process this without parallelism, the indidivual > partition will pass back to driver to process or just run one task on that > node which partition exist? then follow by another partition on another > node? > > > > > > Not exactly. The partition is not shipped back to the driver. You create > a task which will be processed by a worker. The task scheduling will take > data locality into account, so ideally the task will get scheduled in the > same location where the data already resides. The worker will execute > someFunc, and after its done it will ship the *result* back to the driver. > Then the process will get repeated for all the other partitions. > > > > If you wanted all the data sent back to the driver, you could use > RDD.toLocalIterator. That will send one partition back to the driver, let > you process it on the driver, then fetch the next partition, etc. > > > > > > Imran > > > > >