Understood. Thanks for your great help

Cheers
Guillaume

On 2 July 2015 at 23:23, Feynman Liang <fli...@databricks.com> wrote:

> Consider an example dataset [a, b, c, d, e, f]
>
> After sliding(3), you get [(a,b,c), (b,c,d), (c, d, e), (d, e, f)]
>
> After zipWithIndex: [((a,b,c), 0), ((b,c,d), 1), ((c, d, e), 2), ((d, e,
> f), 3)]
>
> After filter: [((a,b,c), 0), ((d, e, f), 3)], which is what I'm assuming
> you want (non-overlapping buckets)? You can then do something like
> .map(func(_._1)) to apply func (e.g. min, max, mean) to the 3-tuples.
>
> On Thu, Jul 2, 2015 at 3:20 PM, tog <guillaume.all...@gmail.com> wrote:
>
>> Well it did reduce the length of my serie of events. I will have to dig
>> what it did actually ;-)
>>
>> I would assume that it took one out of 3 value, is that correct ?
>> Would it be possible to control a bit more how the value assigned to the
>> bucket is computed for example take the first element, the min, the max,
>> mean ... any other function.
>>
>> Thanks for putting me on the right track
>>
>> On 2 July 2015 at 22:56, Feynman Liang <fli...@databricks.com> wrote:
>>
>>> How about:
>>>
>>> events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)
>>>
>>> That would group the RDD into adjacent buckets of size 3.
>>>
>>> On Thu, Jul 2, 2015 at 2:33 PM, tog <guillaume.all...@gmail.com> wrote:
>>>
>>>> Was complaining about the Seq ...
>>>>
>>>> Moved it to
>>>> val eventsfiltered = events.sliding(3).map(s  => Event(s(0).time,
>>>> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))
>>>>
>>>> and that is working.
>>>>
>>>> Anyway this is not what I wanted to do, my goal was more to implement
>>>> bucketing to shorten the time serie.
>>>>
>>>>
>>>> On 2 July 2015 at 18:25, Feynman Liang <fli...@databricks.com> wrote:
>>>>
>>>>> What's the error you are getting?
>>>>>
>>>>> On Thu, Jul 2, 2015 at 9:37 AM, tog <guillaume.all...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Sorry for this scala/spark newbie question. I am creating RDD which
>>>>>> represent large time series this way:
>>>>>> val data = sc.textFile("somefile.csv")
>>>>>>
>>>>>> case class Event(
>>>>>>     time:       Double,
>>>>>>     x:          Double,
>>>>>>     vztot:      Double
>>>>>> )
>>>>>>
>>>>>> val events = data.filter(s => !s.startsWith("GMT")).map{s =>
>>>>>>     val r = s.split(";")
>>>>>> ...
>>>>>>     Event(time, x, vztot )
>>>>>> }
>>>>>>
>>>>>> I would like to process those RDD in order to reduce them by some
>>>>>> filtering. For this I noticed that sliding could help but I was not able 
>>>>>> to
>>>>>> use it so far. Here is what I did:
>>>>>>
>>>>>> import org.apache.spark.mllib.rdd.RDDFunctions._
>>>>>>
>>>>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =>
>>>>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))
>>>>>>
>>>>>> Thanks for your help
>>>>>>
>>>>>>
>>>>>> --
>>>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>>>
>>>
>>>
>>
>>
>> --
>> PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net
>>
>
>


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net

Reply via email to