The way to do this that comes to my mind is to use do two passes.  The
first pass just builds up map from a partition to its first element.  Then
you broadcast that map, and the second pass does what you want, using the
broadcast map to do what you want.  Something like (pseudocode)

val rdd =...
val firstElement = sc.accumulator(scala.collection.mutable.Map[Int,Int]())
rdd.mapPartitionWithIndex{case(idx, itr) =>
  firstElement(idx) = itr.head
  Iterator.empty()
}.count()
val broadcastFirstElements = sc.broadcast(firstElement.value)

val parallelScan = rdd.mapPartitionsWithIndex{case(idx,itr) =>
  //do your sliding window calculation here
  // on the last element of itr, get the next value from
broadcastFirstElements(idx + 1)
}


The one drawback of this approach is that you've got to do 2 passes.  I'm
not entirely sure about this -- I hope that mapPartitionsWithIndex wont'
actually load the entire RDD, and will stop after reading the first
element.  So that first pass should be pretty fast.

Note that the count() is only because we need some action to force the
mapPartitionWithIndex to run so that the value of the accumulator gets
filled.




On Tue, Feb 4, 2014 at 6:25 PM, Adam Novak <ano...@soe.ucsc.edu> wrote:

> Hello,
>
> I'm trying to get a function to run on pairs of successive elements in
> an RDD. For example, if I have an RDD of Ints [5, 4, 3, 2, 1], and I
> run (_ - _) over pairs of successive elements, I want to get back an
> RDD [1, 1, 1, 1]. Basically it's a scan with window size 2, but run in
> parallel.
>
> My first instinct was to do this with zip: copy the RDD, union a bogus
> element onto the front of one RDD to change its offset, and zip the
> original with the offset copy to get pairs of successive values.
> However, RDD zip doesn't support this: I end up trying to zip RDDs
> with differing numbers of partitions/numbers of elements in their
> partitions, and zip silently drops values.
>
> The docs seem to recommend zipPartitions for when you need to zip
> things with differing numbers of elements in each partition, but it
> seems that literally just gives you iterators over corresponding
> partitions, with no way to peek into the next partition if one RDD's
> partition runs out before the other one's does. So I can't use
> zipPartitions to construct a more traditional element-by-element zip.
>
> It seems like a traditional zip is probably not provided because it's
> not possible to efficiently implement it; in the general case, later
> partitions won't know how to match up their elements because they
> don't know how many elements were in prior partitions of each RDD. But
> the only other way I can think of to construct my scan operation is to
> count the whole input RDD, make an RDD of that many sequential
> integers (exatly matching the input RDD partitioning), zip those up,
> make a copy re-keyed by the index of the previous value, and join
> those two RDDs. This seems like a huge amount of work to do something
> that should be really simple: just scan through each partition
> individually, with a little logic to send each partition's first value
> to wherever the previous partition is being worked on.
>
> Is there some obvious way to implement a parallel scan operation that
> I'm missing? Or some reason it isn't as easy as I think it should be?
> Is this already in Spark somewhere where I haven't found it? Is there
> a better way to implement it than the count-and-zip-and-join method?
>
> Thanks,
> -Adam
>

Reply via email to