Re: Spark Streaming timestamps
Hi Bill, Hope the following is what you need. val zerotime = System.currentTimeMillis() Then in foreach do the following //difference = RDDtimeparameter - zerotime //only to find the constant value to be used later starttime = (RDDtimeparameter - (zerotime + difference)) - intervalsize endtime = RDDtimeparameter - (zerotime + difference) Here zerotime is the time when streaming starts. Find out what is the difference (RDDtimeparameter - zerotime) for the first batch. Use it as a constant.(in mycase its 5000ms) At the end I will say its better to work with Spark time stamps rather than Application timestamps as it becomes very messy. If you reach a better solution, also let me know. Regards, Laeeq On Friday, July 18, 2014 7:21 PM, Bill Jay wrote: Hi Tathagata, On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das wrote: The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. > >If you want to do custom intervals, then I suggest the following >1. Do 1 second batch intervals >2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in a >ArrayBuffer/ListBuffer >3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered >RDDs, and process them. > > >So in foreachRDD, based on the time, buffer the RDDs, until you reach the >appropriate time. Then union all the buffered RDDs and process them. > > >TD > > > >On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay wrote: > >Hi Tathagata, >> >> >> >>Thanks for your answer. Please see my further question below: >> >> >> >> >>On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das >>wrote: >> >>Answers inline. >>> >>> >>> >>> >>>On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay wrote: >>> >>>Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? >>>File named 5:00:01 contains results from data received between 5:00:00 and >>> 5:00:01 (based on system time of the cluster). >>> >>> >>> >>>2) If I do not use saveAsTextFiles, how do I get the exact time interval of >>>the RDD when I use foreachRDD to do custom output of the results? >>>There is a version of foreachRDD which allows you specify the function that >>>takes in Time object. >>> >>>3) How can we specify the starting time of the batches? >>> >>>What do you mean? Batches are timed based on the system time of the cluster. >>I would like to control the starting time and ending time of each batch. For >>example, if I use saveAsTextFiles as output method and the batch size is 1 >>minute, Spark will align time intervals to complete minutes, such as 5:01:00, >>5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, >>5:03:03, etc. My goal is to generate output for a customized interval such as >>from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. >> >> >>I checked the api of foreachRDD with time parameter. It seems there is not >>explanation on what does that parameter mean. Does it mean the starting time >>of the first batch? >> >>> Thanks! Bill >>> >> >
Re: Spark Streaming timestamps
Hi Tathagata, On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das wrote: > The RDD parameter in foreachRDD contains raw/transformed data from the > last batch. So when forearchRDD is called with the time parameter as 5:02:01 > and batch size is 1 minute, then the rdd will contain data based on the > data received by between 5:02:00 and 5:02:01. > Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. > > If you want to do custom intervals, then I suggest the following > 1. Do 1 second batch intervals > 2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in > a ArrayBuffer/ListBuffer > 3. At 5:03:29, add the RDD to the buffer, and do a union of all the > buffered RDDs, and process them. > > So in foreachRDD, based on the time, buffer the RDDs, until you reach the > appropriate time. Then union all the buffered RDDs and process them. > > TD > > > On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay > wrote: > >> Hi Tathagata, >> >> Thanks for your answer. Please see my further question below: >> >> >> On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Answers inline. >>> >>> >>> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay >>> wrote: >>> Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? File named 5:00:01 contains results from data received between >>> 5:00:00 and 5:00:01 (based on system time of the cluster). >>> >>> >>> 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? There is a version of foreachRDD which allows you specify the function >>> that takes in Time object. >>> >>> 3) How can we specify the starting time of the batches? >>> >>> What do you mean? Batches are timed based on the system time of the >>> cluster. >>> >> I would like to control the starting time and ending time of each batch. >> For example, if I use saveAsTextFiles as output method and the batch size >> is 1 minute, Spark will align time intervals to complete minutes, such as >> 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, >> 5:02:03, 5:03:03, etc. My goal is to generate output for a customized >> interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. >> >> I checked the api of foreachRDD with time parameter. It seems there is >> not explanation on what does that parameter mean. Does it mean the starting >> time of the first batch? >> >>> >>> Thanks! Bill >>> >>> >> >
Re: Spark Streaming timestamps
The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. If you want to do custom intervals, then I suggest the following 1. Do 1 second batch intervals 2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in a ArrayBuffer/ListBuffer 3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered RDDs, and process them. So in foreachRDD, based on the time, buffer the RDDs, until you reach the appropriate time. Then union all the buffered RDDs and process them. TD On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay wrote: > Hi Tathagata, > > Thanks for your answer. Please see my further question below: > > > On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Answers inline. >> >> >> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay >> wrote: >> >>> Hi all, >>> >>> I am currently using Spark Streaming to conduct a real-time data >>> analytics. We receive data from Kafka. We want to generate output files >>> that contain results that are based on the data we receive from a specific >>> time interval. >>> >>> I have several questions on Spark Streaming's timestamp: >>> >>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate >>> files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix >>> time), etc. Does this mean the results are based on the data from 5:00:01 >>> to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time >>> the files are generated? >>> >>> File named 5:00:01 contains results from data received between 5:00:00 >> and 5:00:01 (based on system time of the cluster). >> >> >> >>> 2) If I do not use saveAsTextFiles, how do I get the exact time interval >>> of the RDD when I use foreachRDD to do custom output of the results? >>> >>> There is a version of foreachRDD which allows you specify the function >> that takes in Time object. >> >> >>> 3) How can we specify the starting time of the batches? >>> >> >> What do you mean? Batches are timed based on the system time of the >> cluster. >> > I would like to control the starting time and ending time of each batch. > For example, if I use saveAsTextFiles as output method and the batch size > is 1 minute, Spark will align time intervals to complete minutes, such as > 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, > 5:02:03, 5:03:03, etc. My goal is to generate output for a customized > interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. > > I checked the api of foreachRDD with time parameter. It seems there is not > explanation on what does that parameter mean. Does it mean the starting > time of the first batch? > >> >> >>> >>> Thanks! >>> >>> Bill >>> >> >> >
Re: Spark Streaming timestamps
Hi Tathagata, Thanks for your answer. Please see my further question below: On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das wrote: > Answers inline. > > > On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay > wrote: > >> Hi all, >> >> I am currently using Spark Streaming to conduct a real-time data >> analytics. We receive data from Kafka. We want to generate output files >> that contain results that are based on the data we receive from a specific >> time interval. >> >> I have several questions on Spark Streaming's timestamp: >> >> 1) If I use saveAsTextFiles, it seems Spark streaming will generate files >> in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), >> etc. Does this mean the results are based on the data from 5:00:01 to >> 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the >> files are generated? >> >> File named 5:00:01 contains results from data received between 5:00:00 > and 5:00:01 (based on system time of the cluster). > > > >> 2) If I do not use saveAsTextFiles, how do I get the exact time interval >> of the RDD when I use foreachRDD to do custom output of the results? >> >> There is a version of foreachRDD which allows you specify the function > that takes in Time object. > > >> 3) How can we specify the starting time of the batches? >> > > What do you mean? Batches are timed based on the system time of the > cluster. > I would like to control the starting time and ending time of each batch. For example, if I use saveAsTextFiles as output method and the batch size is 1 minute, Spark will align time intervals to complete minutes, such as 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 5:03:03, etc. My goal is to generate output for a customized interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. I checked the api of foreachRDD with time parameter. It seems there is not explanation on what does that parameter mean. Does it mean the starting time of the first batch? > > >> >> Thanks! >> >> Bill >> > >
Re: Spark Streaming timestamps
Answers inline. On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay wrote: > Hi all, > > I am currently using Spark Streaming to conduct a real-time data > analytics. We receive data from Kafka. We want to generate output files > that contain results that are based on the data we receive from a specific > time interval. > > I have several questions on Spark Streaming's timestamp: > > 1) If I use saveAsTextFiles, it seems Spark streaming will generate files > in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), > etc. Does this mean the results are based on the data from 5:00:01 to > 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the > files are generated? > > File named 5:00:01 contains results from data received between 5:00:00 and 5:00:01 (based on system time of the cluster). > 2) If I do not use saveAsTextFiles, how do I get the exact time interval > of the RDD when I use foreachRDD to do custom output of the results? > > There is a version of foreachRDD which allows you specify the function that takes in Time object. > 3) How can we specify the starting time of the batches? > What do you mean? Batches are timed based on the system time of the cluster. > > Thanks! > > Bill >
Spark Streaming timestamps
Hi all, I am currently using Spark Streaming to conduct a real-time data analytics. We receive data from Kafka. We want to generate output files that contain results that are based on the data we receive from a specific time interval. I have several questions on Spark Streaming's timestamp: 1) If I use saveAsTextFiles, it seems Spark streaming will generate files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), etc. Does this mean the results are based on the data from 5:00:01 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the files are generated? 2) If I do not use saveAsTextFiles, how do I get the exact time interval of the RDD when I use foreachRDD to do custom output of the results? 3) How can we specify the starting time of the batches? Thanks! Bill