Austin, I made up a mock example...my real use case is more complex. I used foreach() instead of collect/cache..that forces the accumulable to be evaluated. On another thread Xiangrui pointed me to a sliding window rdd in mlllib that is a great alternative (although I did not switch to using it)
Mohit. On Thu, May 22, 2014 at 2:30 PM, Austin Gibbons <[email protected]>wrote: > Mohit, if you want to end up with (1 .. N) , why don't you skip the logic > for finding missing values, and generate it directly? > > val max = myCollection.reduce(math.max) > sc.parallelize((0 until max)) > > In either case, you don't need to call cache, which will force it into > memory - you can do something like "count" which will not necessarily store > the RDD in memory. > > Additionally, instead of an accumulable, you could consider mapping that > value directly: > > rdd.mapPartitionWithIndex{case(index, partition) => index -> > partition.reduce(math.max)}.collectAsMap() > > > On Mon, May 19, 2014 at 9:50 PM, Mohit Jaggi <[email protected]> wrote: > >> Thanks Brian. This works. I used Accumulable to do the "collect" in step >> B. While doing that I found that Accumulable.value is not a Spark "action", >> I need to call "cache" in the underlying RDD for "value" to work. Not sure >> if that is intentional or a bug. >> The "collect" of Step B can be done as a new RDD too. >> >> >> On Thu, May 15, 2014 at 5:47 PM, Brian Gawalt <[email protected]> wrote: >> >>> I don't think there's a direct way of bleeding elements across >>> partitions. But you could write it yourself relatively succinctly: >>> >>> A) Sort the RDD >>> B) Look at the sorted RDD's partitions with the >>> .mapParititionsWithIndex( ) method. Map each partition to its partition ID, >>> and its maximum element. Collect the (partID, maxElements) in the driver. >>> C) Broadcast the collection of (partID, part's max element) tuples >>> D) Look again at the sorted RDD's partitions with >>> mapPartitionsWithIndex( ). For each partition *K:* >>> D1) Find the immediately-preceding partition* K -1 , *and its >>> associated maximum value. Use that to decide how many values are missing >>> between the last element of part *K-1 *and the first element of part *K* >>> . >>> D2) Step through part *K*'s elements and find the rest of the missing >>> elements in that part >>> >>> This approach sidesteps worries you might have over the hack of using >>> .filter to remove the first element, as well as the zipping. >>> >>> --Brian >>> >>> >>> >>> On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi <[email protected]>wrote: >>> >>>> Hi, >>>> I am trying to find a way to fill in missing values in an RDD. The RDD >>>> is a sorted sequence. >>>> For example, (1, 2, 3, 5, 8, 11, ...) >>>> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11) >>>> >>>> One way to do this is to "slide and zip" >>>> rdd1 = sc.parallelize(List(1, 2, 3, 5, 8, 11, ...)) >>>> x = rdd1.first >>>> rdd2 = rdd1 filter (_ != x) >>>> rdd3 = rdd2 zip rdd1 >>>> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and >>>> y } >>>> >>>> Another method which I think is more efficient is to use >>>> mapParititions() on rdd1 to be able to iterate on elements of rdd1 in each >>>> partition. However, that leaves the boundaries of the partitions to be >>>> "unfilled". *Is there a way within the function passed to >>>> mapPartitions, to read the first element in the next partition?* >>>> >>>> The latter approach also appears to work for a general "sliding window" >>>> calculation on the RDD. The former technique requires a lot of "sliding and >>>> zipping" and I believe it is not efficient. If only I could read the next >>>> partition...I have tried passing a pointer to rdd1 to the function passed >>>> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because >>>> Spark cannot deal with a mapper calling another mapper (since it happens on >>>> a worker not the driver) >>>> >>>> Mohit. >>>> >>>> >>> >> > > > -- > . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . > Austin Gibbons > Research | quantiFind <http://www.quantifind.com/> | 708 601 4894 > . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . >
