assuming the data can be partitioned then you have many timeseries for
which you want to detect potential gaps. also assuming the resulting gaps
info per timeseries is much smaller data then the timeseries data itself,
then this is a classical example to me of a sorted (streaming) foldLeft,
requiring an efficient secondary sort in the spark shuffle. i am trying to
get that into spark here:
https://issues.apache.org/jira/browse/SPARK-3655

On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mohitja...@gmail.com> wrote:

>
> http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
>
> you can use the MLLib function or do the following (which is what I had
> done):
>
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>    (partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
>
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.
>
>
>
> On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer <t...@preferred.jp> wrote:
>
> Hi,
>
> On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya <
> ilya.gane...@capitalone.com> wrote:
>
>>  Make a copy of your RDD with an extra entry in the beginning to offset.
>> The you can zip the two RDDs and run a map to generate an RDD of
>> differences.
>>
>
> Does that work? I recently tried something to compute differences between
> each entry and the next, so I did
>   val rdd1 = ... // null element + rdd
>   val rdd2 = ... // rdd + null element
> but got an error message about zip requiring data sizes in each partition
> to match.
>
> Tobias
>
>
>

Reply via email to