Hi Imran,
This is extremely helpful. This is not only an approach, also help me to
understand how to affect or customize my own DAG effectively.

Thanks a lot!

Shuai

On Monday, March 16, 2015, Imran Rashid <iras...@cloudera.com> wrote:

> Hi Shuai,
>
> yup, that is exactly what I meant -- implement your own class
> MyGroupingRDD.  This is definitely more detail than a lot of users will
> need to go, but its also not all that scary either.  In this case, you want
> something that is *extremely* close to the existing CoalescedRDD, so start
> by looking at that code.
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
>
> The only thing which is complicated in CoalescedRDD is the
> PartitionCoalescer, but that is completely irrelevant for you, so you can
> ignore it.  I started writing up a description of what to do but then I
> realized just writing the code would be easier :)  Totally untested, but
> here you go:
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3
>
> The only really interesting part here is getPartitions:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31
>
> That's where you create partitions in your new RDD, which depend on
> multiple RDDs from the parent.  Also note that compute() is very simple:
> you just concatenate together the iterators from each of the parent RDDs:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37
>
> let me know how it goes!
>
>
> On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng <szheng.c...@gmail.com
> <javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com');>> wrote:
>
>> Hi Imran,
>>
>>
>>
>> I am a bit confused here. Assume I have RDD a with 1000 partition and
>> also has been sorted. How can I control when creating RDD b (with 20
>> partitions) to make sure 1-50 partition of RDD a map to 1st partition of
>> RDD b? I don’t see any control code/logic here?
>>
>>
>>
>> You code below:
>>
>>
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>>
>>
>>
>>
>> Does it means I need to define/develop my own MyGroupingRDD class? I am
>> not very clear how to do that, any place I can find an example? I never
>> create my own RDD class before (not RDD instance J). But this is very
>> valuable approach to me so I am desired to learn.
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>> *From:* Imran Rashid [mailto:iras...@cloudera.com
>> <javascript:_e(%7B%7D,'cvml','iras...@cloudera.com');>]
>> *Sent:* Monday, March 16, 2015 11:22 AM
>> *To:* Shawn Zheng; user@spark.apache.org
>> <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');>
>> *Subject:* Re: Process time series RDD after sortByKey
>>
>>
>>
>> Hi Shuai,
>>
>>
>>
>> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.c...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com');>> wrote:
>>
>> Sorry I response late.
>>
>> Zhan Zhang's solution is very interesting and I look at into it, but it
>> is not what I want. Basically I want to run the job sequentially and also
>> gain parallelism. So if possible, if I have 1000 partition, the best case
>> is I can run it as 20 subtask, each one take partition: 1-50, 51-100,
>> 101-150, etc.
>>
>> If we have ability to do this, we will gain huge flexibility when we try
>> to process some time series like data and a lot of algo will benefit from
>> it.
>>
>>
>>
>> yes, this is what I was suggesting you do.  You would first create one
>> RDD (a) that has 1000 partitions.  Don't worry about the creation of this
>> RDD -- it wont' create any tasks, its just a logical holder of your raw
>> data.  Then you create another RDD (b) that depends on your RDD (a), but
>> that only has 20 partitions.  Each partition in (b) would depend on a
>> number of partitions from (a).  As you've suggested, partition 1 in (b)
>> would depend on partitions 1-50 in (a), partition 2 in (b) would depend on
>> 51-100 in (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its
>> just another logical holder for your data, but this time grouped in the way
>> you want.  Then after RDD (b), you would do whatever other transformations
>> you wanted, but now you'd be working w/ 20 partitions:
>>
>>
>>
>> val rawData1000Partitions = sc.textFile(...) // or whatever
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.
>>
>>
>>
>> note that this is almost exactly the same as what CoalescedRdd does.
>> However, it might combine the partitions in whatever ways it feels like --
>> you want them combined in a very particular order.  So you'll need to
>> create your own subclass.
>>
>>
>>
>>
>>
>> Back to Zhan Zhang's
>>
>> while( iterPartition < RDD.partitions.length) {
>>
>>       val res = sc.runJob(this, (it: Iterator[T]) =>
>> somFunc, iterPartition, allowLocal = true)
>>
>>       Some other function after processing one partition.
>>
>>       iterPartition += 1
>>
>> }
>>
>> I am curious how spark process this without parallelism, the indidivual
>> partition will pass back to driver to process or just run one task on that
>> node which partition exist? then follow by another partition on another
>> node?
>>
>>
>>
>>
>>
>> Not exactly.  The partition is not shipped back to the driver.  You
>> create a task which will be processed by a worker.  The task scheduling
>> will take data locality into account, so ideally the task will get
>> scheduled in the same location where the data already resides.  The worker
>> will execute someFunc, and after its done it will ship the *result* back to
>> the driver.  Then the process will get repeated for all the other
>> partitions.
>>
>>
>>
>> If you wanted all the data sent back to the driver, you could use
>> RDD.toLocalIterator.  That will send one partition back to the driver, let
>> you process it on the driver, then fetch the next partition, etc.
>>
>>
>>
>>
>>
>> Imran
>>
>>
>>
>>
>>
>
>

Reply via email to