Yes, the second example does that. It transforms all the points of a partition into a single element the skyline, thus reduce will run on the skyline of two partitions and not on single points. Le 16 avr. 2014 06:47, "Yanzhe Chen" <yanzhe...@gmail.com> a écrit :
> Eugen, > > Thanks for your tip and I do want to merge the result of a partition with > another one but I am still not quite clear how to do it. > > Say the original data rdd has 32 partitions and since mapPartitions won’t > change the number of partitions, it will remain 32 partitions which each > contains the partial skyline of points in its partition. Now I want to > merge those 32 partitions to generate a new skyline. It will be better if I > can use reduce to merge each two of them (than just collect them in to > one), but I think simply calling reduce method on the rdd won’t work > because it reduce the data at the granularity of point rather than the > partition results (which is the collection of points). So is there a way to > reduce the data at the granularity of partitions? > > Thanks, > > Yanzhe > > On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote: > > It depends on your algorithm but I guess that you probably should use > reduce (the code probably doesn't compile but it shows you the idea). > > val result = data.reduce { case (left, right) => > skyline(left ++ right) > } > > Or in the case you want to merge the result of a partition with another > one you could do: > > val result = data.mapPartitions { points => > > // transforms all the partition into a single element, > but this may incur some other problems, especially if you use Kryo > serialization... > *Seq(skyline*(points.toArray)) > }.reduce { case (left, right) => > > skyline(left ++ right) > } > > > > > 2014-04-15 19:37 GMT+02:00 Cheng Lian <lian.cs....@gmail.com>: > > Your Spark solution first reduces partial results into a single partition, > computes the final result, and then collects to the driver side. This > involves a shuffle and two waves of network traffic. Instead, you can > directly collect partial results to the driver and then computes the final > results on driver side: > > val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble))val > partialResults = data.mapPartitions(points => > skyline(points.toArray).iterator).collect()val results = > skyline(partialResults) > > On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen <yanzhe...@gmail.com> wrote: > > Hi all, > > As a previous thread, I am asking how to implement a divide-and-conquer > algorithm (skyline) in Spark. > Here is my current solution: > > val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble)) > > val result = data.mapPartitions(points => > *skyline*(points.toArray).iterator).coalesce(1, > true) > .mapPartitions(points => *skyline* > (points.toArray).iterator).collect() > > where skyline is a local algorithm to compute the results: > > def *skyline*(points: Array[Point]) : Array[Point] > > Basically, I find this implement runs slower than the corresponding Hadoop > version (the identity map phase plus local skyline for both combine and > reduce phases). > > Below are my questions: > > 1. Why this implementation is much slower than the Hadoop one? > > I can find two possible reasons: one is the shuffle overhead in coalesce, > another is calling the toArray and iterator repeatedly when invoking > local skyline algorithm. But I am not sure which one is true. > > I haven’t seen your Hadoop version. But if this assumption is right, the > above version should help. > > > 2. One observation is that while Hadoop version almost used up all the CPU > resources during execution, the CPU seems not that hot on Spark. Is that a > clue to prove that the shuffling might be the real bottleneck? > > How many parallel tasks are there when running your Spark code? I doubt > tasks are queued and run sequentially. > > > 3. Is there any difference between coalesce(1, true) and reparation? It > seems that both opeartions need shuffling data. What’s the proper > situations using the coalesce method? > > repartition(n) is just an alias of coalesce(n, true), so yes, they both > involve data shuffling. coalesce can be used to shrink partition number > when dataset size shrinks dramatically after operations like filter. Say > you have an RDD containing 1TB of data with 100 partitions, after a > .filter(...) call, only 20GB data left, then you may want to coalesce to > 2 partitions rather than 100. > > > 4. More generally, I am trying to implementing some core geometry > computation operators on Spark (like skyline, convex hull etc). In my > understanding, since Spark is more capable of handling iterative > computations on dataset, the above solution apparently doesn’t exploit what > Spark is good at. Any comments on how to do geometry computations on Spark > (if it is possible) ? > > Although Spark is good at iterative algorithms, it also performs better in > batch computing due to much lower scheduling overhead and thread level > parallelism. Theoretically, you can always accelerate your MapReduce job by > rewriting it in Spark. > > > Thanks for any insight. > > Yanzhe > > > >