Austin,
I made up a mock example...my real use case is more complex. I used
foreach() instead of collect/cache..that forces the accumulable to be
evaluated. On another thread Xiangrui pointed me to a sliding window rdd in
mlllib that is a great alternative (although I did not switch to using it)

Mohit.


On Thu, May 22, 2014 at 2:30 PM, Austin Gibbons <aus...@quantifind.com>wrote:

> Mohit, if you want to end up with (1 .. N) , why don't you skip the logic
> for finding missing values, and generate it directly?
>
> val max = myCollection.reduce(math.max)
> sc.parallelize((0 until max))
>
> In either case, you don't need to call cache, which will force it into
> memory - you can do something like "count" which will not necessarily store
> the RDD in memory.
>
> Additionally, instead of an accumulable, you could consider mapping that
> value directly:
>
> rdd.mapPartitionWithIndex{case(index, partition) => index ->
> partition.reduce(math.max)}.collectAsMap()
>
>
> On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi <mohitja...@gmail.com> wrote:
>
>> Thanks Brian. This works. I used Accumulable to do the "collect" in step
>> B. While doing that I found that Accumulable.value is not a Spark "action",
>> I need to call "cache" in the underlying RDD for "value" to work. Not sure
>> if that is intentional or a bug.
>> The "collect" of Step B can be done as a new RDD too.
>>
>>
>> On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <bgaw...@gmail.com> wrote:
>>
>>> I don't think there's a direct way of bleeding elements across
>>> partitions. But you could write it yourself relatively succinctly:
>>>
>>> A) Sort the RDD
>>> B) Look at the sorted RDD's partitions with the
>>> .mapParititionsWithIndex( ) method. Map each partition to its partition ID,
>>> and its maximum element. Collect the (partID, maxElements) in the driver.
>>> C) Broadcast the collection of (partID, part's max element) tuples
>>> D) Look again at the sorted RDD's partitions with
>>> mapPartitionsWithIndex( ). For each partition *K:*
>>> D1) Find the immediately-preceding partition* K -1 , *and its
>>> associated maximum value. Use that to decide how many values are missing
>>> between the last element of part *K-1 *and the first element of part *K*
>>> .
>>> D2) Step through part *K*'s elements and find the rest of the missing
>>> elements in that part
>>>
>>> This approach sidesteps worries you might have over the hack of using
>>> .filter to remove the first element, as well as the zipping.
>>>
>>> --Brian
>>>
>>>
>>>
>>> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <mohitja...@gmail.com>wrote:
>>>
>>>> Hi,
>>>> I am trying to find a way to fill in missing values in an RDD. The RDD
>>>> is a sorted sequence.
>>>> For example, (1, 2, 3, 5, 8, 11, ...)
>>>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>>>>
>>>> One way to do this is to "slide and zip"
>>>> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>>>> x = rdd1.first
>>>> rdd2 = rdd1 filter (_ != x)
>>>> rdd3 = rdd2 zip rdd1
>>>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and
>>>> y }
>>>>
>>>> Another method which I think is more efficient is to use
>>>> mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each
>>>> partition. However, that leaves the boundaries of the partitions to be
>>>> "unfilled". *Is there a way within the function passed to
>>>> mapPartitions, to read the first element in the next partition?*
>>>>
>>>> The latter approach also appears to work for a general "sliding window"
>>>> calculation on the RDD. The former technique requires a lot of "sliding and
>>>> zipping" and I believe it is not efficient. If only I could read the next
>>>> partition...I have tried passing a pointer to rdd1 to the function passed
>>>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>>>> Spark cannot deal with a mapper calling another mapper (since it happens on
>>>> a worker not the driver)
>>>>
>>>> Mohit.
>>>>
>>>>
>>>
>>
>
>
> --
> . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
> Austin Gibbons
> Research | quantiFind <http://www.quantifind.com/> | 708 601 4894
> . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
>

Reply via email to