We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once.
The basic idea is this: val mergedMain = mergedStream .flatMap(r => ....) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this would sum over the batches .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => sumAggregates(x,y), 1800000, 1800000) // sum over the windows .map(rec => ...) // convert data to other format .foreachRDD{ (rdd, time) => rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help.