Understood. Thanks for your great help Cheers Guillaume
On 2 July 2015 at 23:23, Feynman Liang <fli...@databricks.com> wrote: > Consider an example dataset [a, b, c, d, e, f] > > After sliding(3), you get [(a,b,c), (b,c,d), (c, d, e), (d, e, f)] > > After zipWithIndex: [((a,b,c), 0), ((b,c,d), 1), ((c, d, e), 2), ((d, e, > f), 3)] > > After filter: [((a,b,c), 0), ((d, e, f), 3)], which is what I'm assuming > you want (non-overlapping buckets)? You can then do something like > .map(func(_._1)) to apply func (e.g. min, max, mean) to the 3-tuples. > > On Thu, Jul 2, 2015 at 3:20 PM, tog <guillaume.all...@gmail.com> wrote: > >> Well it did reduce the length of my serie of events. I will have to dig >> what it did actually ;-) >> >> I would assume that it took one out of 3 value, is that correct ? >> Would it be possible to control a bit more how the value assigned to the >> bucket is computed for example take the first element, the min, the max, >> mean ... any other function. >> >> Thanks for putting me on the right track >> >> On 2 July 2015 at 22:56, Feynman Liang <fli...@databricks.com> wrote: >> >>> How about: >>> >>> events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0) >>> >>> That would group the RDD into adjacent buckets of size 3. >>> >>> On Thu, Jul 2, 2015 at 2:33 PM, tog <guillaume.all...@gmail.com> wrote: >>> >>>> Was complaining about the Seq ... >>>> >>>> Moved it to >>>> val eventsfiltered = events.sliding(3).map(s => Event(s(0).time, >>>> (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0)) >>>> >>>> and that is working. >>>> >>>> Anyway this is not what I wanted to do, my goal was more to implement >>>> bucketing to shorten the time serie. >>>> >>>> >>>> On 2 July 2015 at 18:25, Feynman Liang <fli...@databricks.com> wrote: >>>> >>>>> What's the error you are getting? >>>>> >>>>> On Thu, Jul 2, 2015 at 9:37 AM, tog <guillaume.all...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> Sorry for this scala/spark newbie question. I am creating RDD which >>>>>> represent large time series this way: >>>>>> val data = sc.textFile("somefile.csv") >>>>>> >>>>>> case class Event( >>>>>> time: Double, >>>>>> x: Double, >>>>>> vztot: Double >>>>>> ) >>>>>> >>>>>> val events = data.filter(s => !s.startsWith("GMT")).map{s => >>>>>> val r = s.split(";") >>>>>> ... >>>>>> Event(time, x, vztot ) >>>>>> } >>>>>> >>>>>> I would like to process those RDD in order to reduce them by some >>>>>> filtering. For this I noticed that sliding could help but I was not able >>>>>> to >>>>>> use it so far. Here is what I did: >>>>>> >>>>>> import org.apache.spark.mllib.rdd.RDDFunctions._ >>>>>> >>>>>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2) => >>>>>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0)) >>>>>> >>>>>> Thanks for your help >>>>>> >>>>>> >>>>>> -- >>>>>> PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >>>> >>> >>> >> >> >> -- >> PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >> > > -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net