Hi Shuai,

yup, that is exactly what I meant -- implement your own class
MyGroupingRDD.  This is definitely more detail than a lot of users will
need to go, but its also not all that scary either.  In this case, you want
something that is *extremely* close to the existing CoalescedRDD, so start
by looking at that code.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

The only thing which is complicated in CoalescedRDD is the
PartitionCoalescer, but that is completely irrelevant for you, so you can
ignore it.  I started writing up a description of what to do but then I
realized just writing the code would be easier :)  Totally untested, but
here you go:

https://gist.github.com/squito/c2d1dd5413a60830d6f3

The only really interesting part here is getPartitions:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31

That's where you create partitions in your new RDD, which depend on
multiple RDDs from the parent.  Also note that compute() is very simple:
you just concatenate together the iterators from each of the parent RDDs:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37

let me know how it goes!


On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <szheng.c...@gmail.com> wrote:

> Hi Imran,
>
>
>
> I am a bit confused here. Assume I have RDD a with 1000 partition and also
> has been sorted. How can I control when creating RDD b (with 20 partitions)
> to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I
> don’t see any control code/logic here?
>
>
>
> You code below:
>
>
>
> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>
>
>
>
>
> Does it means I need to define/develop my own MyGroupingRDD class? I am
> not very clear how to do that, any place I can find an example? I never
> create my own RDD class before (not RDD instance J). But this is very
> valuable approach to me so I am desired to learn.
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>
> *From:* Imran Rashid [mailto:iras...@cloudera.com]
> *Sent:* Monday, March 16, 2015 11:22 AM
> *To:* Shawn Zheng; user@spark.apache.org
> *Subject:* Re: Process time series RDD after sortByKey
>
>
>
> Hi Shuai,
>
>
>
> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.c...@gmail.com>
> wrote:
>
> Sorry I response late.
>
> Zhan Zhang's solution is very interesting and I look at into it, but it is
> not what I want. Basically I want to run the job sequentially and also gain
> parallelism. So if possible, if I have 1000 partition, the best case is I
> can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150,
> etc.
>
> If we have ability to do this, we will gain huge flexibility when we try
> to process some time series like data and a lot of algo will benefit from
> it.
>
>
>
> yes, this is what I was suggesting you do.  You would first create one RDD
> (a) that has 1000 partitions.  Don't worry about the creation of this RDD
> -- it wont' create any tasks, its just a logical holder of your raw data.
> Then you create another RDD (b) that depends on your RDD (a), but that only
> has 20 partitions.  Each partition in (b) would depend on a number of
> partitions from (a).  As you've suggested, partition 1 in (b) would depend
> on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in
> (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just
> another logical holder for your data, but this time grouped in the way you
> want.  Then after RDD (b), you would do whatever other transformations you
> wanted, but now you'd be working w/ 20 partitions:
>
>
>
> val rawData1000Partitions = sc.textFile(...) // or whatever
>
> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>
> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.
>
>
>
> note that this is almost exactly the same as what CoalescedRdd does.
> However, it might combine the partitions in whatever ways it feels like --
> you want them combined in a very particular order.  So you'll need to
> create your own subclass.
>
>
>
>
>
> Back to Zhan Zhang's
>
> while( iterPartition < RDD.partitions.length) {
>
>       val res = sc.runJob(this, (it: Iterator[T]) =>
> somFunc, iterPartition, allowLocal = true)
>
>       Some other function after processing one partition.
>
>       iterPartition += 1
>
> }
>
> I am curious how spark process this without parallelism, the indidivual
> partition will pass back to driver to process or just run one task on that
> node which partition exist? then follow by another partition on another
> node?
>
>
>
>
>
> Not exactly.  The partition is not shipped back to the driver.  You create
> a task which will be processed by a worker.  The task scheduling will take
> data locality into account, so ideally the task will get scheduled in the
> same location where the data already resides.  The worker will execute
> someFunc, and after its done it will ship the *result* back to the driver.
> Then the process will get repeated for all the other partitions.
>
>
>
> If you wanted all the data sent back to the driver, you could use
> RDD.toLocalIterator.  That will send one partition back to the driver, let
> you process it on the driver, then fetch the next partition, etc.
>
>
>
>
>
> Imran
>
>
>
>
>

Reply via email to