Eugen,

Thanks for your tip and I do want to merge the result of a partition with 
another one but I am still not quite clear how to do it.

Say the original data rdd has 32 partitions and since mapPartitions won’t 
change the number of partitions, it will remain 32 partitions which each 
contains the partial skyline of points in its partition. Now I want to merge 
those 32 partitions to generate a new skyline. It will be better if I can use 
reduce to merge each two of them (than just collect them in to one), but I 
think simply calling reduce method on the rdd won’t work because it reduce the 
data at the granularity of point rather than the partition results (which is 
the collection of points). So is there a way to reduce the data at the 
granularity of partitions?

Thanks,

Yanzhe  

On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:

> It depends on your algorithm but I guess that you probably should use reduce 
> (the code probably doesn't compile but it shows you the idea).
>  
> val result = data.reduce { case (left, right) =>
>   skyline(left ++ right)
> }
>  
> Or in the case you want to merge the result of a partition with another one 
> you could do:
>  
> val result = data.mapPartitions { points =>  
>                      
>                     // transforms all the partition into a single element, 
> but this may incur some other problems, especially if you use Kryo 
> serialization...
>                     Seq(skyline(points.toArray))  
>                  }.reduce { case (left, right) =>
>                      
>                     skyline(left ++ right)
>                  }
>  
>  
>  
>  
> 2014-04-15 19:37 GMT+02:00 Cheng Lian <lian.cs....@gmail.com 
> (mailto:lian.cs....@gmail.com)>:
> >  
> > Your Spark solution first reduces partial results into a single partition, 
> > computes the final result, and then collects to the driver side. This 
> > involves a shuffle and two waves of network traffic. Instead, you can 
> > directly collect partial results to the driver and then computes the final 
> > results on driver side:
> >  
> > val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble)) 
> > val partialResults = data.mapPartitions(points => 
> > skyline(points.toArray).iterator).collect() val results = 
> > skyline(partialResults)  
> >  
> > On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen <yanzhe...@gmail.com 
> > (mailto:yanzhe...@gmail.com)> wrote:
> >  
> >  
> >  
> >  
> >  
> > > Hi all,  
> > >  
> > > As a previous thread, I am asking how to implement a divide-and-conquer 
> > > algorithm (skyline) in Spark.
> > > Here is my current solution:
> > >  
> > > val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble))  
> > >  
> > > val result = data.mapPartitions(points => 
> > > skyline(points.toArray).iterator).coalesce(1, true)  
> > >                  .mapPartitions(points => 
> > > skyline(points.toArray).iterator).collect()
> > >  
> > > where skyline is a local algorithm to compute the results:  
> > >  
> > > def skyline(points: Array[Point]) : Array[Point]
> > >  
> > > Basically, I find this implement runs slower than the corresponding 
> > > Hadoop version (the identity map phase plus local skyline for both 
> > > combine and reduce phases).  
> > >  
> > > Below are my questions:
> > >  
> > > 1. Why this implementation is much slower than the Hadoop one?  
> > >  
> > > I can find two possible reasons: one is the shuffle overhead in coalesce, 
> > > another is calling the toArray and iterator repeatedly when invoking 
> > > local skyline algorithm. But I am not sure which one is true.  
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> > I haven’t seen your Hadoop version. But if this assumption is right, the 
> > above version should help.
> >  
> >  
> >  
> >  
> >  
> > >  
> > > 2. One observation is that while Hadoop version almost used up all the 
> > > CPU resources during execution, the CPU seems not that hot on Spark. Is 
> > > that a clue to prove that the shuffling might be the real bottleneck?  
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> > How many parallel tasks are there when running your Spark code? I doubt 
> > tasks are queued and run sequentially.
> >  
> >  
> >  
> >  
> >  
> > >  
> > > 3. Is there any difference between coalesce(1, true) and reparation? It 
> > > seems that both opeartions need shuffling data. What’s the proper 
> > > situations using the coalesce method?  
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> > repartition(n) is just an alias of coalesce(n, true), so yes, they both 
> > involve data shuffling. coalesce can be used to shrink partition number 
> > when dataset size shrinks dramatically after operations like filter. Say 
> > you have an RDD containing 1TB of data with 100 partitions, after a 
> > .filter(...) call, only 20GB data left, then you may want to coalesce to 2 
> > partitions rather than 100.
> >  
> >  
> >  
> >  
> >  
> > >  
> > > 4. More generally, I am trying to implementing some core geometry 
> > > computation operators on Spark (like skyline, convex hull etc). In my 
> > > understanding, since Spark is more capable of handling iterative 
> > > computations on dataset, the above solution apparently doesn’t exploit 
> > > what Spark is good at. Any comments on how to do geometry computations on 
> > > Spark (if it is possible) ?  
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> > Although Spark is good at iterative algorithms, it also performs better in 
> > batch computing due to much lower scheduling overhead and thread level 
> > parallelism. Theoretically, you can always accelerate your MapReduce job by 
> > rewriting it in Spark.
> >  
> >  
> >  
> >  
> >  
> > >  
> > > Thanks for any insight.  
> > >  
> > > Yanzhe
> > >  
> >  
> >  
> >  
> >  
> >  
> >  
> >  
>  
>  
>  

Reply via email to