Thanks Brian. This works. I used Accumulable to do the "collect" in step B. While doing that I found that Accumulable.value is not a Spark "action", I need to call "cache" in the underlying RDD for "value" to work. Not sure if that is intentional or a bug. The "collect" of Step B can be done as a new RDD too.
On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <[email protected]> wrote: > I don't think there's a direct way of bleeding elements across partitions. > But you could write it yourself relatively succinctly: > > A) Sort the RDD > B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( > ) method. Map each partition to its partition ID, and its maximum element. > Collect the (partID, maxElements) in the driver. > C) Broadcast the collection of (partID, part's max element) tuples > D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex( > ). For each partition *K:* > D1) Find the immediately-preceding partition* K -1 , *and its associated > maximum value. Use that to decide how many values are missing between the > last element of part *K-1 *and the first element of part *K*. > D2) Step through part *K*'s elements and find the rest of the missing > elements in that part > > This approach sidesteps worries you might have over the hack of using > .filter to remove the first element, as well as the zipping. > > --Brian > > > > On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <[email protected]> wrote: > >> Hi, >> I am trying to find a way to fill in missing values in an RDD. The RDD is >> a sorted sequence. >> For example, (1, 2, 3, 5, 8, 11, ...) >> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11) >> >> One way to do this is to "slide and zip" >> rdd1 = sc.parallelize(List(1, 2, 3, 5, 8, 11, ...)) >> x = rdd1.first >> rdd2 = rdd1 filter (_ != x) >> rdd3 = rdd2 zip rdd1 >> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y >> } >> >> Another method which I think is more efficient is to use mapParititions() >> on rdd1 to be able to iterate on elements of rdd1 in each partition. >> However, that leaves the boundaries of the partitions to be "unfilled". *Is >> there a way within the function passed to mapPartitions, to read the first >> element in the next partition?* >> >> The latter approach also appears to work for a general "sliding window" >> calculation on the RDD. The former technique requires a lot of "sliding and >> zipping" and I believe it is not efficient. If only I could read the next >> partition...I have tried passing a pointer to rdd1 to the function passed >> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because >> Spark cannot deal with a mapper calling another mapper (since it happens on >> a worker not the driver) >> >> Mohit. >> >> >
