I tried something like that. When I tried just doing count() on the
DStream, it didn't seem like it was actually forcing the computation.

What (sort of) worked was doing a forEachRDD((rdd) => rdd.count()), or
doing a print() on the DStream. The only problem was this seemed to add a
lot of processing overhead -- I couldn't figure out exactly why but it
seemed to have something to do with forEachRDD only being executed on the
driver.

On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș <iulian.dra...@typesafe.com>
wrote:

> On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jgri...@adzerk.com> wrote:
>
> We are aggregating real time logs of events, and want to do windows of 30
>> minutes. However, since the computation doesn't start until 30 minutes have
>> passed, there is a ton of data built up that processing could've already
>> started on. When it comes time to actually process the data, there is too
>> much for our cluster to handle at once.
>>
>> The basic idea is this:
>>
>>  val mergedMain = mergedStream
>>       .flatMap(r => ....) // denormalize data for this particular output
>> stream
>>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
>> this would sum over the batches
>>
> Could you add a dummy action at this point?
>
> val firstStep = mergedStream
>       .flatMap(r => ....) // denormalize data for this particular output 
> stream
>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this 
> would sum over the batches
>       .persist() // this will be reused in windowing operations
>
> firstStep.count() // just to trigger computation
>
> firstStep
>       .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => 
> sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>       .map(rec => ...) // convert data to other format
>       .foreachRDD{ (rdd, time) =>
>         rdd.saveAsTextFile(...) // save to text files
>       }
>
>       .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
>> sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>>       .map(rec => ...) // convert data to other format
>>       .foreachRDD{ (rdd, time) =>
>>         rdd.saveAsTextFile(...) // save to text files
>>       }
>>
>> I would want the batches to be reduced as soon as they arrive (the first
>> reduceByKey), since there isn't any reason to wait. Instead all of the
>> unprocessed data has to be processed at the same time (this data is being
>> heavily denormalized in some cases, and so generates a bunch of additional
>> data).
>>
>> Thanks for any help.
>>
> ​
> --
>
> --
> Iulian Dragos
>
> ------
> Reactive Apps on the JVM
> www.typesafe.com
>
>

Reply via email to