On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jgri...@adzerk.com> wrote:

We are aggregating real time logs of events, and want to do windows of 30
> minutes. However, since the computation doesn't start until 30 minutes have
> passed, there is a ton of data built up that processing could've already
> started on. When it comes time to actually process the data, there is too
> much for our cluster to handle at once.
> The basic idea is this:
>  val mergedMain = mergedStream
>       .flatMap(r => ....) // denormalize data for this particular output
> stream
>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
> this would sum over the batches
Could you add a dummy action at this point?

val firstStep = mergedStream
      .flatMap(r => ....) // denormalize data for this particular output stream
      .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y))
// this would sum over the batches
      .persist() // this will be reused in windowing operations

firstStep.count() // just to trigger computation

      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 1800000, 1800000) // sum over the windows
      .map(rec => ...) // convert data to other format
      .foreachRDD{ (rdd, time) =>
        rdd.saveAsTextFile(...) // save to text files

      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
> sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>       .map(rec => ...) // convert data to other format
>       .foreachRDD{ (rdd, time) =>
>         rdd.saveAsTextFile(...) // save to text files
>       }
> I would want the batches to be reduced as soon as they arrive (the first
> reduceByKey), since there isn't any reason to wait. Instead all of the
> unprocessed data has to be processed at the same time (this data is being
> heavily denormalized in some cases, and so generates a bunch of additional
> data).
> Thanks for any help.

Iulian Dragos

Reactive Apps on the JVM

Reply via email to