Thanks Brian. This works. I used Accumulable to do the "collect" in step B.
While doing that I found that Accumulable.value is not a Spark "action", I
need to call "cache" in the underlying RDD for "value" to work. Not sure if
that is intentional or a bug.
The "collect" of Step B can be done as a new RDD too.


On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <bgaw...@gmail.com> wrote:

> I don't think there's a direct way of bleeding elements across partitions.
> But you could write it yourself relatively succinctly:
>
> A) Sort the RDD
> B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex(
> ) method. Map each partition to its partition ID, and its maximum element.
> Collect the (partID, maxElements) in the driver.
> C) Broadcast the collection of (partID, part's max element) tuples
> D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex(
> ). For each partition *K:*
> D1) Find the immediately-preceding partition* K -1 , *and its associated
> maximum value. Use that to decide how many values are missing between the
> last element of part *K-1 *and the first element of part *K*.
> D2) Step through part *K*'s elements and find the rest of the missing
> elements in that part
>
> This approach sidesteps worries you might have over the hack of using
> .filter to remove the first element, as well as the zipping.
>
> --Brian
>
>
>
> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:
>
>> Hi,
>> I am trying to find a way to fill in missing values in an RDD. The RDD is
>> a sorted sequence.
>> For example, (1, 2, 3, 5, 8, 11, ...)
>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>>
>> One way to do this is to "slide and zip"
>> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>> x = rdd1.first
>> rdd2 = rdd1 filter (_ != x)
>> rdd3 = rdd2 zip rdd1
>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y
>> }
>>
>> Another method which I think is more efficient is to use mapParititions()
>> on rdd1 to be able to iterate on elements of rdd1 in each partition.
>> However, that leaves the boundaries of the partitions to be "unfilled". *Is
>> there a way within the function passed to mapPartitions, to read the first
>> element in the next partition?*
>>
>> The latter approach also appears to work for a general "sliding window"
>> calculation on the RDD. The former technique requires a lot of "sliding and
>> zipping" and I believe it is not efficient. If only I could read the next
>> partition...I have tried passing a pointer to rdd1 to the function passed
>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>> Spark cannot deal with a mapper calling another mapper (since it happens on
>> a worker not the driver)
>>
>> Mohit.
>>
>>
>

Reply via email to