Could you try this?

df.groupBy(cast(col("timeStamp") - start) / bucketLengthSec,
IntegerType)).agg(max("timestamp"), max("value")).collect()

On Wed, Dec 9, 2015 at 8:54 AM, Arun Verma <arun.verma...@gmail.com> wrote:
> Hi all,
>
> We have RDD(main) of sorted time-series data. We want to split it into
> different RDDs according to window size and then perform some aggregation
> operation like max, min etc. over each RDD in parallel.
>
> If window size is w then ith RDD has data from (startTime + (i-1)*w) to
> (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD and
> (startTime + (i-1)*w) is greater then last entry of main RDD.
>
> For now, I am using DataFrame and Spark version 1.5.2. Below code is running
> sequentially on the data, so execution time is high and resource utilization
> is low. Code snippet is given below:
> /*
> * aggragator is max
> * df - Dataframe has sorted timeseries data
> * start - first entry of DataFrame
> * end - last entry of DataFrame df
> * bucketLengthSec - window size
> * stepResults - has particular block/window output(JSON)
> * appendResults - has output till this block/window(JSON)
> */
> while (start <= end) {
>     row = df.filter(df.col("timeStamp")
>             .between(start, nextStart))
>             .agg(max(df.col("timeStamp")), max(df.col("value")))
>             .first();
>     if (row.get(0) != null) {
>         stepResults = new JSONObject();
>         stepResults.put("x", Long.parseLong(row.get(0).toString()));
>         stepResults.put("y", row.get(1));
>         appendResults.add(stepResults);
>     }
>     start = nextStart;
>     nextStart = start + bucketLengthSec;
> }
>
>
> --
> Thanks and Regards,
> Arun Verma

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to