yeah i meant foldLeft by key, sorted by date.
it is non-commutative because i care about the order of processing the
values (chronological). i dont see how i can do it with a reduce
efficiently, but i would be curious to hear otherwise. i might be biased
since this is such a typical operation in map-reduce.

so basically assuming its logs of servers being RDD[(String, Long)] where
String is the server name and Long is the timestamp, you keep a state that
contains the last observed timestamp (if any) and the list of found gaps.
so state type would be (Option[Long], List[Long]). as you process items in
the timeseries chronologically you always update the last observed
timestamp and possible add to the list of found gaps.

foldLeftByKey on RDD[(K, V)] looks something like this:
def foldLeftByKey(state: X)(update: (X, V) => X)(implicit ord:
Ordering[V]): RDD[(K, X)]

and the logic would be (just made this up, didnt test or compile):

rdd.foldLeftByKey((None: Option[Long]), List.empty[Long])){
  case ((Some(prev), gaps), curr) if (curr - prev > thres) => (Some(curr),
curr :: gaps) // gap found
  case ((_, gaps, curr) => ((Some(curr), gaps) // no gap found
}

the sort required within timeseries would be done efficiently by spark in
the shuffle (assuming sort-based shuffle is enabled). the foldLeftByKey
would never require the entire timeseries per key to be in memory. however
every timeseries would be processed by a single task, so it might take a
while if the timeseries is very large.

On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak <michaelma...@yahoo.com>
wrote:

> But isn't foldLeft() overkill for the originally stated use case of max
> diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative
> non-associative accumulation as opposed to an embarrassingly parallel
> operation such as this one?
>
> This use case reminds me of FIR filtering in DSP. It seems that RDDs could
> use something that serves the same purpose as
> scala.collection.Iterator.sliding.
>
>   ------------------------------
>  *From:* Koert Kuipers <ko...@tresata.com>
> *To:* Mohit Jaggi <mohitja...@gmail.com>
> *Cc:* Tobias Pfeiffer <t...@preferred.jp>; "Ganelin, Ilya" <
> ilya.gane...@capitalone.com>; derrickburns <derrickrbu...@gmail.com>; "
> user@spark.apache.org" <user@spark.apache.org>
> *Sent:* Friday, January 30, 2015 7:11 AM
> *Subject:* Re: spark challenge: zip with next???
>
> assuming the data can be partitioned then you have many timeseries for
> which you want to detect potential gaps. also assuming the resulting gaps
> info per timeseries is much smaller data then the timeseries data itself,
> then this is a classical example to me of a sorted (streaming) foldLeft,
> requiring an efficient secondary sort in the spark shuffle. i am trying to
> get that into spark here:
> https://issues.apache.org/jira/browse/SPARK-3655
>
>
>
> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mohitja...@gmail.com>
> wrote:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
>
> you can use the MLLib function or do the following (which is what I had
> done):
>
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>    (partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
>
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.
>
>
>
> On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer <t...@preferred.jp> wrote:
>
> Hi,
>
> On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya <
> ilya.gane...@capitalone.com> wrote:
>
>  Make a copy of your RDD with an extra entry in the beginning to offset.
> The you can zip the two RDDs and run a map to generate an RDD of
> differences.
>
>
> Does that work? I recently tried something to compute differences between
> each entry and the next, so I did
>   val rdd1 = ... // null element + rdd
>   val rdd2 = ... // rdd + null element
> but got an error message about zip requiring data sizes in each partition
> to match.
>
> Tobias
>
>
>
>
>
>

Reply via email to