RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
Hi, Is there any specific scenario which needs to know the RDD numbers in the DStream? According to my knowledge DStream will generate one RDD in each right batchDuration, some old rdd will be remembered for windowing-like function, and will be removed when useless. The hashmap generatedRDDs

RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
Hi Jerry, Thanks for your reply. I use spark streaming to receive the flume stream, then I need to do a judgement, in each batchDuration, if the received stream has data, then I should do something, if no data, do the other thing. Then I thought the count() can give me the measure, but it returns

Re: Spark streaming: size of DStream

2014-09-09 Thread Sean Owen
How about calling foreachRDD, and processing whatever data is in each RDD normally, and also keeping track within the foreachRDD function of whether any RDD had a count() 0? if not, then you can execute at the end your alternate logic in the case of no data. I don't think you want to operate at

RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
= if (r.count() == 0) { do something } else { do some other things. } } You can try it. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 3:42 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size

RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
Thanks all, yes, i did using foreachRDD, the following is my code: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o={ *count = 0L *// reset the count variable in each batch o })

RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
i'm sorry I have some error in my code, update here: var count = -1L // a global variable in the main object val currentBatch = some_DStream val countDStream = currentBatch.map(o={ count = 0L // reset the count variable in each batch o }) countDStream.foreachRDD(rdd= count

Re: Spark streaming: size of DStream

2014-09-09 Thread Luis Ángel Vicente Sánchez
If you take into account what streaming means in spark, your goal doesn't really make sense; you have to assume that your streams are infinite and you will have to process them till the end of the days. Operations on a DStream define what you want to do with each element of each RDD, but spark

RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
injected in this batch duration, it's only an empty transformation, no more specific overhead. Thanks Jerry -Original Message- From: julyfire [mailto:hellowe...@gmail.com] Sent: Tuesday, September 09, 2014 4:20 PM To: u...@spark.incubator.apache.org Subject: RE: Spark streaming: size

RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
yes, I agree to directly transform on DStream even there is no data injected in this batch duration. while my situation is : Spark receive flume stream continurously, and I use updateStateByKey function to collect data for a key among several batches, then I will handle the collected data after