On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jgri...@adzerk.com> wrote:
We are aggregating real time logs of events, and want to do windows of 30 > minutes. However, since the computation doesn't start until 30 minutes have > passed, there is a ton of data built up that processing could've already > started on. When it comes time to actually process the data, there is too > much for our cluster to handle at once. > > The basic idea is this: > > val mergedMain = mergedStream > .flatMap(r => ....) // denormalize data for this particular output > stream > .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // > this would sum over the batches > Could you add a dummy action at this point? val firstStep = mergedStream .flatMap(r => ....) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this would sum over the batches .persist() // this will be reused in windowing operations firstStep.count() // just to trigger computation firstStep .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => sumAggregates(x,y), 1800000, 1800000) // sum over the windows .map(rec => ...) // convert data to other format .foreachRDD{ (rdd, time) => rdd.saveAsTextFile(...) // save to text files } .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => > sumAggregates(x,y), 1800000, 1800000) // sum over the windows > .map(rec => ...) // convert data to other format > .foreachRDD{ (rdd, time) => > rdd.saveAsTextFile(...) // save to text files > } > > I would want the batches to be reduced as soon as they arrive (the first > reduceByKey), since there isn't any reason to wait. Instead all of the > unprocessed data has to be processed at the same time (this data is being > heavily denormalized in some cases, and so generates a bunch of additional > data). > > Thanks for any help. > -- -- Iulian Dragos ------ Reactive Apps on the JVM www.typesafe.com