We are aggregating real time logs of events, and want to do windows of 30
minutes. However, since the computation doesn't start until 30 minutes have
passed, there is a ton of data built up that processing could've already
started on. When it comes time to actually process the data, there is too
much for our cluster to handle at once.

The basic idea is this:

 val mergedMain = mergedStream
      .flatMap(r => ....) // denormalize data for this particular output
stream
      .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
this would sum over the batches
      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 1800000, 1800000) // sum over the windows
      .map(rec => ...) // convert data to other format
      .foreachRDD{ (rdd, time) =>
        rdd.saveAsTextFile(...) // save to text files
      }

I would want the batches to be reduced as soon as they arrive (the first
reduceByKey), since there isn't any reason to wait. Instead all of the
unprocessed data has to be processed at the same time (this data is being
heavily denormalized in some cases, and so generates a bunch of additional
data).

Thanks for any help.

Reply via email to