Hi Imran,

 

I am a bit confused here. Assume I have RDD a with 1000 partition and also has 
been sorted. How can I control when creating RDD b (with 20 partitions) to make 
sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any 
control code/logic here?

 

You code below:

 

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

 

 

Does it means I need to define/develop my own MyGroupingRDD class? I am not 
very clear how to do that, any place I can find an example? I never create my 
own RDD class before (not RDD instance J). But this is very valuable approach 
to me so I am desired to learn.

 

Regards,

 

Shuai

 

From: Imran Rashid [mailto:iras...@cloudera.com] 
Sent: Monday, March 16, 2015 11:22 AM
To: Shawn Zheng; user@spark.apache.org
Subject: Re: Process time series RDD after sortByKey

 

Hi Shuai,

 

On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng <szheng.c...@gmail.com> wrote:

Sorry I response late.

Zhan Zhang's solution is very interesting and I look at into it, but it is not 
what I want. Basically I want to run the job sequentially and also gain 
parallelism. So if possible, if I have 1000 partition, the best case is I can 
run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. 

If we have ability to do this, we will gain huge flexibility when we try to 
process some time series like data and a lot of algo will benefit from it.

 

yes, this is what I was suggesting you do.  You would first create one RDD (a) 
that has 1000 partitions.  Don't worry about the creation of this RDD -- it 
wont' create any tasks, its just a logical holder of your raw data.  Then you 
create another RDD (b) that depends on your RDD (a), but that only has 20 
partitions.  Each partition in (b) would depend on a number of partitions from 
(a).  As you've suggested, partition 1 in (b) would depend on partitions 1-50 
in (a), partition 2 in (b) would depend on 51-100 in (a), etc.   Note that RDD 
(b) still doesn't *do* anything.  Its just another logical holder for your 
data, but this time grouped in the way you want.  Then after RDD (b), you would 
do whatever other transformations you wanted, but now you'd be working w/ 20 
partitions:

 

val rawData1000Partitions = sc.textFile(...) // or whatever

val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.

 

note that this is almost exactly the same as what CoalescedRdd does.  However, 
it might combine the partitions in whatever ways it feels like -- you want them 
combined in a very particular order.  So you'll need to create your own 
subclass.

 

 

Back to Zhan Zhang's 

while( iterPartition < RDD.partitions.length) {

      val res = sc.runJob(this, (it: Iterator[T]) => somFunc, iterPartition, 
allowLocal = true)

      Some other function after processing one partition.

      iterPartition += 1

}

I am curious how spark process this without parallelism, the indidivual 
partition will pass back to driver to process or just run one task on that node 
which partition exist? then follow by another partition on another node?

 

 

Not exactly.  The partition is not shipped back to the driver.  You create a 
task which will be processed by a worker.  The task scheduling will take data 
locality into account, so ideally the task will get scheduled in the same 
location where the data already resides.  The worker will execute someFunc, and 
after its done it will ship the *result* back to the driver.  Then the process 
will get repeated for all the other partitions.

 

If you wanted all the data sent back to the driver, you could use 
RDD.toLocalIterator.  That will send one partition back to the driver, let you 
process it on the driver, then fetch the next partition, etc.

 

 

Imran

 

 

Reply via email to