Re: Relation between DStream and RDDs

2014-03-21 Thread Sanjay Awatramani
Hi,

I searched more articles and ran few examples and have clarified my doubts. 
This answer by TD in another thread ( 
https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped 
me a lot.

Here is the summary of my finding:
1) A DStream can consist of 0 or 1 or more RDDs.
2) Even if you have multiple files to be read in a time interval, DStream will 
have only 1 RDD.
3) Functions like reduce  count return as many no. of RDDs as there were in 
the input DStream. However the internal computation in every batch will have 
only 1 RDD, so these functions will return 1 RDD in the returned DStream. 
However if you are using window functions to get more RDDs, and run 
reduce/count on the windowed DStream, your returned DStream will have more than 
1 RDD.

Hope this helps someone.
Thanks everyone for the answers.

Regards,
Sanjay



On Thursday, 20 March 2014 9:30 PM, andy petrella andy.petre...@gmail.com 
wrote:
 
Don't see an example, but conceptually it looks like you'll need an according 
structure like a Monoid. I mean, because if it's not tied to a window, it's an 
overall computation that has to be increased over time (otherwise it would land 
in the batch world see after) and that will be the purpose of Monoid, and 
specially probabilistic sets (avoid sucking the whole memory).

If it falls in the batch job's world because you have enough information 
encapsulated in one conceptual RDD, it might be helpful to have DStream storing 
it in hdfs, then using the SparkContext within the StreaminContext to run a 
batch job on the data.

But I'm only thinking out of loud, so I might be completely wrong.

hth


Andy Petrella

Belgium (Liège)

       

 Data Engineer in NextLab sprl (owner)
 Engaged Citizen Coder for WAJUG (co-founder)
 Author of Learning Play! Framework 2

 Bio: on visify
       

Mobile: +32 495 99 11 04
Mails:  
* andy.petre...@nextlab.be
* andy.petre...@gmail.com
       

Socials:
* Twitter: https://twitter.com/#!/noootsab

* LinkedIn: http://be.linkedin.com/in/andypetrella
* Blogger: http://ska-la.blogspot.com/
* GitHub:  https://github.com/andypetrella
* Masterbranch: https://masterbranch.com/andy.petrella


On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev 
pascal.voitot@gmail.com wrote:






On Thu, Mar 20, 2014 at 11:57 AM, andy petrella andy.petre...@gmail.com 
wrote:

also consider creating pairs and use *byKey* operators, and then the key will 
be the structure that will be used to consolidate or deduplicate your data
my2c




One thing I wonder: imagine I want to sub-divide RDDs in a DStream into 
several RDDs but not according to time window, I don't see any trivial way to 
do it...

 



On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev 
pascal.voitot@gmail.com wrote:

Actually it's quite simple...

DStream[T] is a stream of RDD[T].
So applying count on DStream is just applying count on each RDD of this 
DStream.
So at the end of count, you have a DStream[Int] containing the same number 
of RDDs as before but each RDD just contains one element being the count 
result for the corresponding original RDD.



For reduce, it's the same using reduce operation...

The only operations that are a bit more complex are reduceByWindow  
countByValueAndWindow which union RDD over the time window...



On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani sanjay_a...@yahoo.com 
wrote:

@TD: I do not need multiple RDDs in a DStream in every batch. On the 
contrary my logic would work fine if there is only 1 RDD. But then the 
description for functions like reduce  count (Return a new DStream of 
single-element RDDs by counting the number of elements in each RDD of the 
source DStream.) left me confused whether I should account for the fact that 
a DStream can have multiple RDDs. My streaming code processes a batch every 
hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. 
the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that 
mean that the DStream will always contain only 1 RDD ? 


A DStream creates a RDD for each window corresponding to your batch duration 
(maybe if there are no data in the current time window, no RDD is created 
but I'm not sure about that)
So no, there is not one single RDD in a DStream, it just depends on the 
batch duration and the collected data.


 
Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st 
batch may contain some records which were not relevant to the first batch 
and are to be processed in the 2nd batch. I know i can use the sliding 
window mechanism of streaming, but if i'm not using it and there is no way 
to access the previous batch's RDD, then it means that functions like count 
will always return a DStream containing only 1 RDD, am i correct ?




count will be executed for each RDD in the dstream as explained above.


If you want to do operations on several RDD in the same 

Re: Relation between DStream and RDDs

2014-03-21 Thread Azuryy
Thanks for sharing here.

Sent from my iPhone5s

 On 2014年3月21日, at 20:44, Sanjay Awatramani sanjay_a...@yahoo.com wrote:
 
 Hi,
 
 I searched more articles and ran few examples and have clarified my doubts. 
 This answer by TD in another thread ( 
 https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped 
 me a lot.
 
 Here is the summary of my finding:
 1) A DStream can consist of 0 or 1 or more RDDs.
 2) Even if you have multiple files to be read in a time interval, DStream 
 will have only 1 RDD.
 3) Functions like reduce  count return as many no. of RDDs as there were in 
 the input DStream. However the internal computation in every batch will have 
 only 1 RDD, so these functions will return 1 RDD in the returned DStream. 
 However if you are using window functions to get more RDDs, and run 
 reduce/count on the windowed DStream, your returned DStream will have more 
 than 1 RDD.
 
 Hope this helps someone.
 Thanks everyone for the answers.
 
 Regards,
 Sanjay
 
 
 On Thursday, 20 March 2014 9:30 PM, andy petrella andy.petre...@gmail.com 
 wrote:
 Don't see an example, but conceptually it looks like you'll need an according 
 structure like a Monoid. I mean, because if it's not tied to a window, it's 
 an overall computation that has to be increased over time (otherwise it would 
 land in the batch world see after) and that will be the purpose of Monoid, 
 and specially probabilistic sets (avoid sucking the whole memory).
 
 If it falls in the batch job's world because you have enough information 
 encapsulated in one conceptual RDD, it might be helpful to have DStream 
 storing it in hdfs, then using the SparkContext within the StreaminContext to 
 run a batch job on the data.
 
 But I'm only thinking out of loud, so I might be completely wrong.
 
 hth
 
 Andy Petrella
 Belgium (Liège)

  Data Engineer in NextLab sprl (owner)
  Engaged Citizen Coder for WAJUG (co-founder)
  Author of Learning Play! Framework 2
  Bio: on visify

 Mobile: +32 495 99 11 04
 Mails:  
 andy.petre...@nextlab.be
 andy.petre...@gmail.com

 Socials:
 Twitter: https://twitter.com/#!/noootsab
 LinkedIn: http://be.linkedin.com/in/andypetrella
 Blogger: http://ska-la.blogspot.com/
 GitHub:  https://github.com/andypetrella
 Masterbranch: https://masterbranch.com/andy.petrella
 
 
 On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev 
 pascal.voitot@gmail.com wrote:
 
 
 
 On Thu, Mar 20, 2014 at 11:57 AM, andy petrella andy.petre...@gmail.com 
 wrote:
 also consider creating pairs and use *byKey* operators, and then the key will 
 be the structure that will be used to consolidate or deduplicate your data
 my2c
 
 
 One thing I wonder: imagine I want to sub-divide RDDs in a DStream into 
 several RDDs but not according to time window, I don't see any trivial way to 
 do it...
  
 
 
 On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev 
 pascal.voitot@gmail.com wrote:
 Actually it's quite simple...
 
 DStream[T] is a stream of RDD[T].
 So applying count on DStream is just applying count on each RDD of this 
 DStream.
 So at the end of count, you have a DStream[Int] containing the same number of 
 RDDs as before but each RDD just contains one element being the count result 
 for the corresponding original RDD.
 
 For reduce, it's the same using reduce operation...
 
 The only operations that are a bit more complex are reduceByWindow  
 countByValueAndWindow which union RDD over the time window...
 
 On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani sanjay_a...@yahoo.com 
 wrote:
 @TD: I do not need multiple RDDs in a DStream in every batch. On the contrary 
 my logic would work fine if there is only 1 RDD. But then the description for 
 functions like reduce  count (Return a new DStream of single-element RDDs by 
 counting the number of elements in each RDD of the source DStream.) left me 
 confused whether I should account for the fact that a DStream can have 
 multiple RDDs. My streaming code processes a batch every hour. In the 2nd 
 batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's 
 RDD. I verified this using sysout in foreachRDD. Does that mean that the 
 DStream will always contain only 1 RDD ?
 
 A DStream creates a RDD for each window corresponding to your batch duration 
 (maybe if there are no data in the current time window, no RDD is created but 
 I'm not sure about that)
 So no, there is not one single RDD in a DStream, it just depends on the batch 
 duration and the collected data.
 
  
 Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st 
 batch may contain some records which were not relevant to the first batch and 
 are to be processed in the 2nd batch. I know i can use the sliding window 
 mechanism of streaming, but if i'm not using it and there is no way to access 
 the previous batch's RDD, then it means that functions like count will always 
 return a DStream containing only 1 RDD, 

Re: Relation between DStream and RDDs

2014-03-20 Thread Pascal Voitot Dev
If I may add my contribution to this discussion if I understand well your
question...

DStream is discretized stream. It discretized the data stream over windows
of time (according to the project code I've read and paper too). so when
you write:

JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
Duration(60 * 60 * 1000)); //1 hour

It means you are discretizing over a 1h window. Each batch so each RDD of
the dstream will collect data for 1h before going to next RDD.
So if you want to have more RDD, you should reduce batch size/duration...

Pascal


On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das
tathagata.das1...@gmail.comwrote:

 That is a good question. If I understand correctly, you need multiple RDDs
 from a DStream in *every batch*. Can you elaborate on why do you need
 multiple RDDs every batch?

 TD


 On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com
  wrote:

 Hi,

 As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
 will run a given func on each and every RDD inside a DStream.

 I created a simple program which reads log files from a folder every hour:
 JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
 Duration(60 * 60 * 1000)); //1 hour
 JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input);

 When the interval is reached, Spark reads all the files and creates one
 and only one RDD (as i verified from a sysout inside foreachRDD).

 The streaming doc at a lot of places gives an indication that many
 operations (e.g. flatMap) on a DStream are applied individually to a RDD
 and the resulting DStream consists of the mapped RDDs in the same number as
 the input DStream.
 ref:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams

 If that is the case, how can i generate a scenario where in I have
 multiple RDDs inside a DStream in my example ?

 Regards,
 Sanjay





Re: Relation between DStream and RDDs

2014-03-20 Thread Sanjay Awatramani
@TD: I do not need multiple RDDs in a DStream in every batch. On the contrary 
my logic would work fine if there is only 1 RDD. But then the description for 
functions like reduce  count (Return a new DStream of single-element RDDs by 
counting the number of elements in each RDD of the source DStream.) left me 
confused whether I should account for the fact that a DStream can have multiple 
RDDs. My streaming code processes a batch every hour. In the 2nd batch, i 
checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I 
verified this using sysout in foreachRDD. Does that mean that the DStream will 
always contain only 1 RDD ? Is there a way to access the RDD of the 1st batch 
in the 2nd batch ? The 1st batch may contain some records which were not 
relevant to the first batch and are to be processed in the 2nd batch. I know i 
can use the sliding window mechanism of streaming, but if i'm not using it and 
there is no way to access the previous
 batch's RDD, then it means that functions like count will always return a 
DStream containing only 1 RDD, am i correct ?

@Pascal, yes your answer resolves my question partially, but the other part of 
the question(which i've clarified in above paragraph) still remains.

Thanks for your answers !

Regards,
Sanjay



On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev 
pascal.voitot@gmail.com wrote:
 
If I may add my contribution to this discussion if I understand well your 
question...


DStream is discretized stream. It discretized the data stream over windows of 
time (according to the project code I've read and paper too). so when you write:


JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 
* 60 * 1000)); //1 hour


It means you are discretizing over a 1h window. Each batch so each RDD of the 
dstream will collect data for 1h before going to next RDD.

So if you want to have more RDD, you should reduce batch size/duration...


Pascal




On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das tathagata.das1...@gmail.com 
wrote:

That is a good question. If I understand correctly, you need multiple RDDs from 
a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs 
every batch?


TD



On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com 
wrote:

Hi,


As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will 
run a given func on each and every RDD inside a DStream.


I created a simple program which reads log files from a folder every hour:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new 
Duration(60 * 60 * 1000)); //1 hour
JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input);


When the interval is reached, Spark reads all the files and creates one and 
only one RDD (as i verified from a sysout inside foreachRDD).


The streaming doc at a lot of places gives an indication that many operations 
(e.g. flatMap) on a DStream are applied individually to a RDD and the 
resulting DStream consists of the mapped RDDs in the same number as the input 
DStream.
ref: 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams


If that is the case, how can i generate a scenario where in I have multiple 
RDDs inside a DStream in my example ?


Regards,
Sanjay