Good idea, will do for 1.2 release.
On Oct 29, 2014 9:50 AM, "Gerard Maas" <gerard.m...@gmail.com> wrote:

> Hi TD,
>
> Thanks a lot for the comprehensive answer.
>
> I think this explanation deserves some place in the Spark Streaming tuning
> guide.
>
> -kr, Gerard.
>
> On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey Gerard,
>>
>> This is a very good question!
>>
>> *TL;DR: *The performance should be same, except in case of shuffle-based
>> operations where the number of reducers is not explicitly specified.
>>
>> Let me answer in more detail by dividing the set of DStream operations
>> into three categories.
>>
>> *1. Map-like operations (map, flatmap, filter, etc.) that does not
>> involve any shuffling of data:* Performance should virtually be the same
>> in both cases. Either ways, in each batch, the operations on the batch's
>> RDD are first set on the driver, and then the actions like on the RDD are
>> executed. There are very very minor differences in the two cases of early
>> foreachRDD and late foreachRDD (e.x, cleaning up for function closures,
>> etc.) but those should make almost not difference in the performance.
>>
>> *2. Operations involving shuffle: *Here is there is a subtle difference
>> in both cases if the number of partitions is not specified. The default
>> number of partitions used when using dstream.reduceByKey() and than when
>> using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to
>> play around with the number of reducers to see what performs better. But if
>> the number of reducers is explicitly specified and is the same both cases,
>> then the performance should be similar. Note that this difference in the
>> default numbers are not guaranteed to be like this, it could change in
>> future implementations.
>>
>> *3. Aggregation-like operations (count, reduce): *Here there is another
>> subtle execution difference between
>> - dstream.count() which produces a DStream of single-element RDDs, the
>> element being the count, and
>> - dstream.foreachRDD(_.count()) which returns the count directly.
>>
>> In the first case, some random worker node is chosen for the reduce, in
>> another the driver is chosen for the reduce. There should not be a
>> significant performance difference.
>>
>> *4. Other operations* including window ops and stateful ops
>> (updateStateByKey), are obviously not part of the discussion as they cannot
>> be (easily) done through early foreachRDD.
>>
>> Hope this helps!
>>
>> TD
>>
>> PS: Sorry for not noticing this question earlier.
>>
>> On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> PS: Just to clarify my statement:
>>>
>>> >>Unlike the feared RDD operations on the driver, it's my understanding
>>> that these Dstream ops on the driver are merely creating an execution plan
>>> for each RDD.
>>>
>>> With "feared RDD operations on the driver" I meant to contrast an rdd
>>> action like rdd.collect that would pull all rdd data to the driver, with
>>> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on
>>> the driver' yet, all that it looks to be running on the driver is the
>>> scheduling of 'op' on that rdd, just like it happens for all rdd other
>>> operations
>>> (thanks to Sean for the clarification)
>>>
>>> So, not to move focus away from the original question:
>>>
>>> In Spark Streaming, would it be better to do foreachRDD early in a
>>> pipeline or instead do as much Dstream transformations before going into
>>> the foreachRDD call?
>>>
>>> Between these two pieces of code, from a performance perspective, what
>>> would be preferred and why:
>>>
>>> - Early foreachRDD:
>>>
>>> dstream.foreachRDD(rdd =>
>>>     val records = rdd.map(elem => record(elem))
>>>     targets.foreach(target => records.filter{record =>
>>> isTarget(target,record)}.writeToCassandra(target,table))
>>> )
>>>
>>> - As most dstream transformations as possible before foreachRDD:
>>>
>>> val recordStream = dstream.map(elem => record(elem))
>>> targets.foreach{target => recordStream.filter(record =>
>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>>
>>> ?
>>>
>>> kr, Gerard.
>>>
>>>
>>>
>>> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Matt,
>>>>
>>>> Unlike the feared RDD operations on the driver, it's my understanding
>>>> that these Dstream ops on the driver are merely creating an execution plan
>>>> for each RDD.
>>>> My question still remains: Is it better to foreachRDD early in the
>>>> process or do as much Dstream transformations before going into the
>>>> foreachRDD call?
>>>>
>>>> Maybe this will require some empirical testing specific to each
>>>> implementation?
>>>>
>>>> -kr, Gerard.
>>>>
>>>>
>>>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com>
>>>> wrote:
>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>>>>>
>>>>> foreachRDD is executed on the driver….
>>>>>
>>>>> mn
>>>>>
>>>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Pinging TD  -- I'm sure you know :-)
>>>>>
>>>>> -kr, Gerard.
>>>>>
>>>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have been implementing several Spark Streaming jobs that are
>>>>>> basically processing data and inserting it into Cassandra, sorting it 
>>>>>> among
>>>>>> different keyspaces.
>>>>>>
>>>>>> We've been following the pattern:
>>>>>>
>>>>>> dstream.foreachRDD(rdd =>
>>>>>>     val records = rdd.map(elem => record(elem))
>>>>>>     targets.foreach(target => records.filter{record =>
>>>>>> isTarget(target,record)}.writeToCassandra(target,table))
>>>>>> )
>>>>>>
>>>>>> I've been wondering whether there would be a performance difference
>>>>>> in transforming the dstream instead of transforming the RDD within the
>>>>>> dstream with regards to how the transformations get scheduled.
>>>>>>
>>>>>> Instead of the RDD-centric computation, I could transform the dstream
>>>>>> until the last step, where I need an rdd to store.
>>>>>> For example, the  previous  transformation could be written as:
>>>>>>
>>>>>> val recordStream = dstream.map(elem => record(elem))
>>>>>> targets.foreach{target => recordStream.filter(record =>
>>>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>>>>>
>>>>>> Would  be a difference in execution and/or performance?  What would
>>>>>> be the preferred way to do this?
>>>>>>
>>>>>> Bonus question: Is there a better (more performant) way to sort the
>>>>>> data in different "buckets" instead of filtering the data collection 
>>>>>> times
>>>>>> the #buckets?
>>>>>>
>>>>>> thanks,  Gerard.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to