Hi Imran, This is extremely helpful. This is not only an approach, also help me to understand how to affect or customize my own DAG effectively.
Thanks a lot! Shuai On Monday, March 16, 2015, Imran Rashid <iras...@cloudera.com> wrote: > Hi Shuai, > > yup, that is exactly what I meant -- implement your own class > MyGroupingRDD. This is definitely more detail than a lot of users will > need to go, but its also not all that scary either. In this case, you want > something that is *extremely* close to the existing CoalescedRDD, so start > by looking at that code. > > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala > > The only thing which is complicated in CoalescedRDD is the > PartitionCoalescer, but that is completely irrelevant for you, so you can > ignore it. I started writing up a description of what to do but then I > realized just writing the code would be easier :) Totally untested, but > here you go: > > https://gist.github.com/squito/c2d1dd5413a60830d6f3 > > The only really interesting part here is getPartitions: > > > https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31 > > That's where you create partitions in your new RDD, which depend on > multiple RDDs from the parent. Also note that compute() is very simple: > you just concatenate together the iterators from each of the parent RDDs: > > > https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37 > > let me know how it goes! > > > On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <szheng.c...@gmail.com > <javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com');>> wrote: > >> Hi Imran, >> >> >> >> I am a bit confused here. Assume I have RDD a with 1000 partition and >> also has been sorted. How can I control when creating RDD b (with 20 >> partitions) to make sure 1-50 partition of RDD a map to 1st partition of >> RDD b? I don’t see any control code/logic here? >> >> >> >> You code below: >> >> >> >> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) >> >> >> >> >> >> Does it means I need to define/develop my own MyGroupingRDD class? I am >> not very clear how to do that, any place I can find an example? I never >> create my own RDD class before (not RDD instance J). But this is very >> valuable approach to me so I am desired to learn. >> >> >> >> Regards, >> >> >> >> Shuai >> >> >> >> *From:* Imran Rashid [mailto:iras...@cloudera.com >> <javascript:_e(%7B%7D,'cvml','iras...@cloudera.com');>] >> *Sent:* Monday, March 16, 2015 11:22 AM >> *To:* Shawn Zheng; user@spark.apache.org >> <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');> >> *Subject:* Re: Process time series RDD after sortByKey >> >> >> >> Hi Shuai, >> >> >> >> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.c...@gmail.com >> <javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com');>> wrote: >> >> Sorry I response late. >> >> Zhan Zhang's solution is very interesting and I look at into it, but it >> is not what I want. Basically I want to run the job sequentially and also >> gain parallelism. So if possible, if I have 1000 partition, the best case >> is I can run it as 20 subtask, each one take partition: 1-50, 51-100, >> 101-150, etc. >> >> If we have ability to do this, we will gain huge flexibility when we try >> to process some time series like data and a lot of algo will benefit from >> it. >> >> >> >> yes, this is what I was suggesting you do. You would first create one >> RDD (a) that has 1000 partitions. Don't worry about the creation of this >> RDD -- it wont' create any tasks, its just a logical holder of your raw >> data. Then you create another RDD (b) that depends on your RDD (a), but >> that only has 20 partitions. Each partition in (b) would depend on a >> number of partitions from (a). As you've suggested, partition 1 in (b) >> would depend on partitions 1-50 in (a), partition 2 in (b) would depend on >> 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its >> just another logical holder for your data, but this time grouped in the way >> you want. Then after RDD (b), you would do whatever other transformations >> you wanted, but now you'd be working w/ 20 partitions: >> >> >> >> val rawData1000Partitions = sc.textFile(...) // or whatever >> >> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) >> >> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. >> >> >> >> note that this is almost exactly the same as what CoalescedRdd does. >> However, it might combine the partitions in whatever ways it feels like -- >> you want them combined in a very particular order. So you'll need to >> create your own subclass. >> >> >> >> >> >> Back to Zhan Zhang's >> >> while( iterPartition < RDD.partitions.length) { >> >> val res = sc.runJob(this, (it: Iterator[T]) => >> somFunc, iterPartition, allowLocal = true) >> >> Some other function after processing one partition. >> >> iterPartition += 1 >> >> } >> >> I am curious how spark process this without parallelism, the indidivual >> partition will pass back to driver to process or just run one task on that >> node which partition exist? then follow by another partition on another >> node? >> >> >> >> >> >> Not exactly. The partition is not shipped back to the driver. You >> create a task which will be processed by a worker. The task scheduling >> will take data locality into account, so ideally the task will get >> scheduled in the same location where the data already resides. The worker >> will execute someFunc, and after its done it will ship the *result* back to >> the driver. Then the process will get repeated for all the other >> partitions. >> >> >> >> If you wanted all the data sent back to the driver, you could use >> RDD.toLocalIterator. That will send one partition back to the driver, let >> you process it on the driver, then fetch the next partition, etc. >> >> >> >> >> >> Imran >> >> >> >> >> > >