Hi Tathagata,



On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The RDD parameter in foreachRDD contains raw/transformed data from the
> last batch. So when forearchRDD is called with the time parameter as 5:02:01
> and batch size is 1 minute, then the rdd will contain data based on the
> data received by between 5:02:00 and 5:02:01.
>
Do you mean the data between 5:02:02 and 5:02:01? The time parameter is
5:02:01. Moreover, when the program is running, it is very difficult to
specify a starting time because sometimes it is difficult to know when the
program executes that line. And do we need a different time parameter for
each foreachRDD or Spark will calculate the next one according to batch.

>
> If you want to do custom intervals, then I suggest the following
> 1. Do 1 second batch intervals
> 2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in
> a ArrayBuffer/ListBuffer
> 3. At 5:03:29, add the RDD to the buffer, and do a union of all the
> buffered RDDs, and process them.
>
> So in foreachRDD, based on the time, buffer the RDDs, until you reach the
> appropriate time. Then union all the buffered RDDs and process them.
>
> TD
>
>
> On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for your answer. Please see my further question below:
>>
>>
>> On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Answers inline.
>>>
>>>
>>> On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am currently using Spark Streaming to conduct a real-time data
>>>> analytics. We receive data from Kafka. We want to generate output files
>>>> that contain results that are based on the data we receive from a specific
>>>> time interval.
>>>>
>>>> I have several questions on Spark Streaming's timestamp:
>>>>
>>>> 1) If I use saveAsTextFiles, it seems Spark streaming will generate
>>>> files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix
>>>> time), etc. Does this mean the results are based on the data from 5:00:01
>>>> to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time
>>>> the files are generated?
>>>>
>>>> File named  5:00:01 contains results from data received between
>>>  5:00:00 and  5:00:01 (based on system time of the cluster).
>>>
>>>
>>>
>>>> 2) If I do not use saveAsTextFiles, how do I get the exact time
>>>> interval of the RDD when I use foreachRDD to do custom output of the
>>>> results?
>>>>
>>>> There is a version of foreachRDD which allows you specify the function
>>> that takes in Time object.
>>>
>>>
>>>> 3) How can we specify the starting time of the batches?
>>>>
>>>
>>> What do you mean? Batches are timed based on the system time of the
>>> cluster.
>>>
>> I would like to control the starting time and ending time of each batch.
>> For example, if I use saveAsTextFiles as output method and the batch size
>> is 1 minute, Spark will align time intervals to complete minutes, such as
>> 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
>> 5:02:03, 5:03:03, etc. My goal is to generate output for a customized
>> interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.
>>
>> I checked the api of foreachRDD with time parameter. It seems there is
>> not explanation on what does that parameter mean. Does it mean the starting
>> time of the first batch?
>>
>>>
>>>
>>>>
>>>> Thanks!
>>>>
>>>> Bill
>>>>
>>>
>>>
>>
>

Reply via email to