Good idea, will do for 1.2 release. On Oct 29, 2014 9:50 AM, "Gerard Maas" <gerard.m...@gmail.com> wrote:
> Hi TD, > > Thanks a lot for the comprehensive answer. > > I think this explanation deserves some place in the Spark Streaming tuning > guide. > > -kr, Gerard. > > On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hey Gerard, >> >> This is a very good question! >> >> *TL;DR: *The performance should be same, except in case of shuffle-based >> operations where the number of reducers is not explicitly specified. >> >> Let me answer in more detail by dividing the set of DStream operations >> into three categories. >> >> *1. Map-like operations (map, flatmap, filter, etc.) that does not >> involve any shuffling of data:* Performance should virtually be the same >> in both cases. Either ways, in each batch, the operations on the batch's >> RDD are first set on the driver, and then the actions like on the RDD are >> executed. There are very very minor differences in the two cases of early >> foreachRDD and late foreachRDD (e.x, cleaning up for function closures, >> etc.) but those should make almost not difference in the performance. >> >> *2. Operations involving shuffle: *Here is there is a subtle difference >> in both cases if the number of partitions is not specified. The default >> number of partitions used when using dstream.reduceByKey() and than when >> using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to >> play around with the number of reducers to see what performs better. But if >> the number of reducers is explicitly specified and is the same both cases, >> then the performance should be similar. Note that this difference in the >> default numbers are not guaranteed to be like this, it could change in >> future implementations. >> >> *3. Aggregation-like operations (count, reduce): *Here there is another >> subtle execution difference between >> - dstream.count() which produces a DStream of single-element RDDs, the >> element being the count, and >> - dstream.foreachRDD(_.count()) which returns the count directly. >> >> In the first case, some random worker node is chosen for the reduce, in >> another the driver is chosen for the reduce. There should not be a >> significant performance difference. >> >> *4. Other operations* including window ops and stateful ops >> (updateStateByKey), are obviously not part of the discussion as they cannot >> be (easily) done through early foreachRDD. >> >> Hope this helps! >> >> TD >> >> PS: Sorry for not noticing this question earlier. >> >> On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> PS: Just to clarify my statement: >>> >>> >>Unlike the feared RDD operations on the driver, it's my understanding >>> that these Dstream ops on the driver are merely creating an execution plan >>> for each RDD. >>> >>> With "feared RDD operations on the driver" I meant to contrast an rdd >>> action like rdd.collect that would pull all rdd data to the driver, with >>> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on >>> the driver' yet, all that it looks to be running on the driver is the >>> scheduling of 'op' on that rdd, just like it happens for all rdd other >>> operations >>> (thanks to Sean for the clarification) >>> >>> So, not to move focus away from the original question: >>> >>> In Spark Streaming, would it be better to do foreachRDD early in a >>> pipeline or instead do as much Dstream transformations before going into >>> the foreachRDD call? >>> >>> Between these two pieces of code, from a performance perspective, what >>> would be preferred and why: >>> >>> - Early foreachRDD: >>> >>> dstream.foreachRDD(rdd => >>> val records = rdd.map(elem => record(elem)) >>> targets.foreach(target => records.filter{record => >>> isTarget(target,record)}.writeToCassandra(target,table)) >>> ) >>> >>> - As most dstream transformations as possible before foreachRDD: >>> >>> val recordStream = dstream.map(elem => record(elem)) >>> targets.foreach{target => recordStream.filter(record => >>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >>> >>> ? >>> >>> kr, Gerard. >>> >>> >>> >>> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com> >>> wrote: >>> >>>> Thanks Matt, >>>> >>>> Unlike the feared RDD operations on the driver, it's my understanding >>>> that these Dstream ops on the driver are merely creating an execution plan >>>> for each RDD. >>>> My question still remains: Is it better to foreachRDD early in the >>>> process or do as much Dstream transformations before going into the >>>> foreachRDD call? >>>> >>>> Maybe this will require some empirical testing specific to each >>>> implementation? >>>> >>>> -kr, Gerard. >>>> >>>> >>>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com> >>>> wrote: >>>> >>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html >>>>> >>>>> foreachRDD is executed on the driver…. >>>>> >>>>> mn >>>>> >>>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com> >>>>> wrote: >>>>> >>>>> Pinging TD -- I'm sure you know :-) >>>>> >>>>> -kr, Gerard. >>>>> >>>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have been implementing several Spark Streaming jobs that are >>>>>> basically processing data and inserting it into Cassandra, sorting it >>>>>> among >>>>>> different keyspaces. >>>>>> >>>>>> We've been following the pattern: >>>>>> >>>>>> dstream.foreachRDD(rdd => >>>>>> val records = rdd.map(elem => record(elem)) >>>>>> targets.foreach(target => records.filter{record => >>>>>> isTarget(target,record)}.writeToCassandra(target,table)) >>>>>> ) >>>>>> >>>>>> I've been wondering whether there would be a performance difference >>>>>> in transforming the dstream instead of transforming the RDD within the >>>>>> dstream with regards to how the transformations get scheduled. >>>>>> >>>>>> Instead of the RDD-centric computation, I could transform the dstream >>>>>> until the last step, where I need an rdd to store. >>>>>> For example, the previous transformation could be written as: >>>>>> >>>>>> val recordStream = dstream.map(elem => record(elem)) >>>>>> targets.foreach{target => recordStream.filter(record => >>>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >>>>>> >>>>>> Would be a difference in execution and/or performance? What would >>>>>> be the preferred way to do this? >>>>>> >>>>>> Bonus question: Is there a better (more performant) way to sort the >>>>>> data in different "buckets" instead of filtering the data collection >>>>>> times >>>>>> the #buckets? >>>>>> >>>>>> thanks, Gerard. >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >