Could you try this? df.groupBy(cast(col("timeStamp") - start) / bucketLengthSec, IntegerType)).agg(max("timestamp"), max("value")).collect()
On Wed, Dec 9, 2015 at 8:54 AM, Arun Verma <arun.verma...@gmail.com> wrote: > Hi all, > > We have RDD(main) of sorted time-series data. We want to split it into > different RDDs according to window size and then perform some aggregation > operation like max, min etc. over each RDD in parallel. > > If window size is w then ith RDD has data from (startTime + (i-1)*w) to > (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD and > (startTime + (i-1)*w) is greater then last entry of main RDD. > > For now, I am using DataFrame and Spark version 1.5.2. Below code is running > sequentially on the data, so execution time is high and resource utilization > is low. Code snippet is given below: > /* > * aggragator is max > * df - Dataframe has sorted timeseries data > * start - first entry of DataFrame > * end - last entry of DataFrame df > * bucketLengthSec - window size > * stepResults - has particular block/window output(JSON) > * appendResults - has output till this block/window(JSON) > */ > while (start <= end) { > row = df.filter(df.col("timeStamp") > .between(start, nextStart)) > .agg(max(df.col("timeStamp")), max(df.col("value"))) > .first(); > if (row.get(0) != null) { > stepResults = new JSONObject(); > stepResults.put("x", Long.parseLong(row.get(0).toString())); > stepResults.put("y", row.get(1)); > appendResults.add(stepResults); > } > start = nextStart; > nextStart = start + bucketLengthSec; > } > > > -- > Thanks and Regards, > Arun Verma --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org