I tried something like that. When I tried just doing count() on the DStream, it didn't seem like it was actually forcing the computation.
What (sort of) worked was doing a forEachRDD((rdd) => rdd.count()), or doing a print() on the DStream. The only problem was this seemed to add a lot of processing overhead -- I couldn't figure out exactly why but it seemed to have something to do with forEachRDD only being executed on the driver. On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș <iulian.dra...@typesafe.com> wrote: > On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jgri...@adzerk.com> wrote: > > We are aggregating real time logs of events, and want to do windows of 30 >> minutes. However, since the computation doesn't start until 30 minutes have >> passed, there is a ton of data built up that processing could've already >> started on. When it comes time to actually process the data, there is too >> much for our cluster to handle at once. >> >> The basic idea is this: >> >> val mergedMain = mergedStream >> .flatMap(r => ....) // denormalize data for this particular output >> stream >> .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // >> this would sum over the batches >> > Could you add a dummy action at this point? > > val firstStep = mergedStream > .flatMap(r => ....) // denormalize data for this particular output > stream > .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this > would sum over the batches > .persist() // this will be reused in windowing operations > > firstStep.count() // just to trigger computation > > firstStep > .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => > sumAggregates(x,y), 1800000, 1800000) // sum over the windows > .map(rec => ...) // convert data to other format > .foreachRDD{ (rdd, time) => > rdd.saveAsTextFile(...) // save to text files > } > > .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => >> sumAggregates(x,y), 1800000, 1800000) // sum over the windows >> .map(rec => ...) // convert data to other format >> .foreachRDD{ (rdd, time) => >> rdd.saveAsTextFile(...) // save to text files >> } >> >> I would want the batches to be reduced as soon as they arrive (the first >> reduceByKey), since there isn't any reason to wait. Instead all of the >> unprocessed data has to be processed at the same time (this data is being >> heavily denormalized in some cases, and so generates a bunch of additional >> data). >> >> Thanks for any help. >> > > -- > > -- > Iulian Dragos > > ------ > Reactive Apps on the JVM > www.typesafe.com > >