Hi Tathagata,
On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > The RDD parameter in foreachRDD contains raw/transformed data from the > last batch. So when forearchRDD is called with the time parameter as 5:02:01 > and batch size is 1 minute, then the rdd will contain data based on the > data received by between 5:02:00 and 5:02:01. > Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. > > If you want to do custom intervals, then I suggest the following > 1. Do 1 second batch intervals > 2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in > a ArrayBuffer/ListBuffer > 3. At 5:03:29, add the RDD to the buffer, and do a union of all the > buffered RDDs, and process them. > > So in foreachRDD, based on the time, buffer the RDDs, until you reach the > appropriate time. Then union all the buffered RDDs and process them. > > TD > > > On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi Tathagata, >> >> Thanks for your answer. Please see my further question below: >> >> >> On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Answers inline. >>> >>> >>> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bill.jaypeter...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I am currently using Spark Streaming to conduct a real-time data >>>> analytics. We receive data from Kafka. We want to generate output files >>>> that contain results that are based on the data we receive from a specific >>>> time interval. >>>> >>>> I have several questions on Spark Streaming's timestamp: >>>> >>>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate >>>> files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix >>>> time), etc. Does this mean the results are based on the data from 5:00:01 >>>> to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time >>>> the files are generated? >>>> >>>> File named 5:00:01 contains results from data received between >>> 5:00:00 and 5:00:01 (based on system time of the cluster). >>> >>> >>> >>>> 2) If I do not use saveAsTextFiles, how do I get the exact time >>>> interval of the RDD when I use foreachRDD to do custom output of the >>>> results? >>>> >>>> There is a version of foreachRDD which allows you specify the function >>> that takes in Time object. >>> >>> >>>> 3) How can we specify the starting time of the batches? >>>> >>> >>> What do you mean? Batches are timed based on the system time of the >>> cluster. >>> >> I would like to control the starting time and ending time of each batch. >> For example, if I use saveAsTextFiles as output method and the batch size >> is 1 minute, Spark will align time intervals to complete minutes, such as >> 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, >> 5:02:03, 5:03:03, etc. My goal is to generate output for a customized >> interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. >> >> I checked the api of foreachRDD with time parameter. It seems there is >> not explanation on what does that parameter mean. Does it mean the starting >> time of the first batch? >> >>> >>> >>>> >>>> Thanks! >>>> >>>> Bill >>>> >>> >>> >> >