How about:

events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)

That would group the RDD into adjacent buckets of size 3.

On Thu, Jul 2, 2015 at 2:33 PM, tog <guillaume.all...@gmail.com> wrote:

> Was complaining about the Seq ...
>
> Moved it to
> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>
> and that is working.
>
> Anyway this is not what I wanted to do, my goal was more to implement
> bucketing to shorten the time serie.
>
>
> On 2 July 2015 at 18:25, Feynman Liang <fli...@databricks.com> wrote:
>
>> What's the error you are getting?
>>
>> On Thu, Jul 2, 2015 at 9:37 AM, tog <guillaume.all...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>> represent large time series this way:
>>> val data = sc.textFile("somefile.csv")
>>>
>>> case class Event(
>>>     time:       Double,
>>>     x:          Double,
>>>     vztot:      Double
>>> )
>>>
>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>     val r = s.split(";")
>>> ...
>>>     Event(time, x, vztot )
>>> }
>>>
>>> I would like to process those RDD in order to reduce them by some
>>> filtering. For this I noticed that sliding could help but I was not able to
>>> use it so far. Here is what I did:
>>>
>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>
>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>
>>> Thanks for your help
>>>
>>>
>>> --
>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>
>>
>>
>
>
> --
> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>

Reply via email to