How about: events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)
That would group the RDD into adjacent buckets of size 3. On Thu, Jul 2, 2015 at 2:33 PM, tog <guillaume.all...@gmail.com> wrote: > Was complaining about the Seq ... > > Moved it to > val eventsfiltered = events.sliding(3).map(s => Event(s(0).time, > (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0)) > > and that is working. > > Anyway this is not what I wanted to do, my goal was more to implement > bucketing to shorten the time serie. > > > On 2 July 2015 at 18:25, Feynman Liang <fli...@databricks.com> wrote: > >> What's the error you are getting? >> >> On Thu, Jul 2, 2015 at 9:37 AM, tog <guillaume.all...@gmail.com> wrote: >> >>> Hi >>> >>> Sorry for this scala/spark newbie question. I am creating RDD which >>> represent large time series this way: >>> val data = sc.textFile("somefile.csv") >>> >>> case class Event( >>> time: Double, >>> x: Double, >>> vztot: Double >>> ) >>> >>> val events = data.filter(s => !s.startsWith("GMT")).map{s => >>> val r = s.split(";") >>> ... >>> Event(time, x, vztot ) >>> } >>> >>> I would like to process those RDD in order to reduce them by some >>> filtering. For this I noticed that sliding could help but I was not able to >>> use it so far. Here is what I did: >>> >>> import org.apache.spark.mllib.rdd.RDDFunctions._ >>> >>> val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2) => >>> Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0)) >>> >>> Thanks for your help >>> >>> >>> -- >>> PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >>> >> >> > > > -- > PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net >