I don't think there's a direct way of bleeding elements across partitions.
But you could write it yourself relatively succinctly:

A) Sort the RDD
B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( )
method. Map each partition to its partition ID, and its maximum element.
Collect the (partID, maxElements) in the driver.
C) Broadcast the collection of (partID, part's max element) tuples
D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex(
). For each partition *K:*
D1) Find the immediately-preceding partition* K -1 , *and its associated
maximum value. Use that to decide how many values are missing between the
last element of part *K-1 *and the first element of part *K*.
D2) Step through part *K*'s elements and find the rest of the missing
elements in that part

This approach sidesteps worries you might have over the hack of using
.filter to remove the first element, as well as the zipping.

--Brian



On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:

> Hi,
> I am trying to find a way to fill in missing values in an RDD. The RDD is
> a sorted sequence.
> For example, (1, 2, 3, 5, 8, 11, ...)
> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>
> One way to do this is to "slide and zip"
> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
> x = rdd1.first
> rdd2 = rdd1 filter (_ != x)
> rdd3 = rdd2 zip rdd1
> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y }
>
> Another method which I think is more efficient is to use mapParititions()
> on rdd1 to be able to iterate on elements of rdd1 in each partition.
> However, that leaves the boundaries of the partitions to be "unfilled". *Is
> there a way within the function passed to mapPartitions, to read the first
> element in the next partition?*
>
> The latter approach also appears to work for a general "sliding window"
> calculation on the RDD. The former technique requires a lot of "sliding and
> zipping" and I believe it is not efficient. If only I could read the next
> partition...I have tried passing a pointer to rdd1 to the function passed
> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
> Spark cannot deal with a mapper calling another mapper (since it happens on
> a worker not the driver)
>
> Mohit.
>
>

Reply via email to