Re: Spark Streaming timing considerations

2014-07-21 Thread Laeeq Ahmed


Hi TD,

Thanks for the help.

The only problem left here is that the dstreamTime contains some extra 
information which seems date i.e. 1405944367000 ms whereas my application 
timestamps are just in sec which I converted 
to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect.

I was thinking to add that extra info to my Time(4000). But I am not really 
sure what it is?    


val keyAndValues = eegStreams.map(x= {
    val token = x.split( )
    ((token(0).toDouble * 
1000).toLong,token(1).toDouble)
    })


val transformed = 
keyAndValues.window(Seconds(8),Seconds(4)).transform((windowedRDD, dstreamTime) 
= {
val currentAppTimeWindowStart = dstreamTime - Time(4000)  // 
define the window over the timestamp that you want to process
val currentAppTimeWindowEnd = dstreamTime
val filteredRDD = windowedRDD.filter(r = Duration(r._1)  
currentAppTimeWindowStart  Time(r._1) = currentAppTimeWindowEnd)  
filteredRDD
 })

The sample input is as under

 AppTimestamp Datapoints  

0 -145.934066 
0.003906 0.19536 
0.007812 0.19536 
0.011719 0.19536 
0.015625 0.19536 
0.019531 0.976801 
0.023438 0.586081 
0.027344 -1.758242 
0.03125 -1.367521 
0.035156 2.930403 
0.039062 4.102564 
0.042969 3.711844 
0.046875 2.148962 
0.050781 -4.102564 
0.054688 -1.758242 
0.058594 3.711844 
0.0625 9.181929 
0.066406 11.135531 
0.070312 4.884005 
0.074219 0.976801 
0.078125 4.493284 
0.082031 11.135531 
0.085938 12.698413 
0.089844 15.824176 
0.09375 21.684982 
0.097656 22.466422 
0.101562 18.949939 
0.105469 14.652015 
0.109375 11.135531 
0.113281 1.758242 
0.117188 -6.056166 
0.121094 -0.976801 
0.125 0.19536 
0.128906 -6.837607 
0.132812 -8.400488 
0.136719 -14.261294 
0.140625 -24.810745 
0.144531 -25.592186 
0.148438 -19.73138 
0.152344 -18.559219 
0.15625 -25.201465 

Regards,
Laeeq


On Thursday, July 17, 2014 8:58 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


You have to define what is the range records that needs to be filtered out in 
every windowed RDD, right? For example, when the DStream.window has data from 
from times 0 - 8 seconds by DStream time, you only want to filter out data that 
falls into say 4 - 8 seconds by application time. This latter is the 
application-level time window that you need to define in the transform 
function. What may help is that there is another version of transform which 
allows you to get the current DStream time (that is, it will give the value 
8) from which you can calculate the app-time-window 4 - 8. 


val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform((windowedRDD: RDD[...], dstreamTime: Time) = {
 val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize
   // define the window over the timestamp that you want to process 
 val currentAppTimeWindowEnd = dstreamTime
 val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd  
r._1  currentAppTimeWindowStart) // filter and retain only the records 
that fall in the current app-time window
 return filteredRDD
 })


Hope this helps!

TD



On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi TD,


I have been able to filter the first WindowedRDD, but I am not sure how to 
make a generic filter. The larger window is 8 seconds and want to fetch 4 
second based on application-time-stamp. I have seen an earlier post which 
suggest timeStampBasedwindow but I am not sure how to make 
timestampBasedwindow in the following example. 



 val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform(windowedRDD = {
 //val timeStampBasedWindow = ???                    // define the window over 
the timestamp that you want to process
 val filteredRDD = windowedRDD.filter(_._1  4)     // filter and retain only 
the records that fall in the timestamp-based window
 return filteredRDD
 })

Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . .  
whereas key is the timestamp.

Regards,
Laeeq
 





On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 


Hi,
Thanks I will try to implement it.


Regards,
Laeeq





 On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:
 


This is not in the current streaming API.


Queue stream is useful for testing with generated RDDs, but not for actual 
data. For actual data stream, the slack time can be implemented by doing 
DStream.window on a larger window that take slack time in consideration, and 
then the required application-time-based-window of data filtered out. For 
example, if you want a slack time of 1 minute and batches of 10 seconds, then 
do a window operation of 70 seconds, then in each RDD filter out the records 
with the desired application time and process them. 


TD



On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi,


In the 

Re: Spark Streaming timing considerations

2014-07-21 Thread Sean Owen
That is just standard Unix time.

1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT


On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:


 Hi TD,

 Thanks for the help.

 The only problem left here is that the dstreamTime contains some extra 
 information which seems date i.e. 1405944367000 ms whereas my application 
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 
 etc. So the filter doesn't take effect.

 I was thinking to add that extra info to my Time(4000). But I am not really 
 sure what it is?



Re: Spark Streaming timing considerations

2014-07-21 Thread Sean Owen
Uh, right. I mean:

1405944367 = Mon, 21 Jul 2014 12:06:07 GMT

On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote:
 That is just standard Unix time.

 1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT


 On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:


 Hi TD,

 Thanks for the help.

 The only problem left here is that the dstreamTime contains some extra 
 information which seems date i.e. 1405944367000 ms whereas my application 
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 
 etc. So the filter doesn't take effect.

 I was thinking to add that extra info to my Time(4000). But I am not really 
 sure what it is?



Re: Spark Streaming timing considerations

2014-07-21 Thread Tathagata Das
You will have to use some function that converts the dstreamTime (ms since
epoch, same format as returned by System.currentTimeMillis), and your
application-level time.

TD


On Mon, Jul 21, 2014 at 9:47 AM, Sean Owen so...@cloudera.com wrote:

 Uh, right. I mean:

 1405944367 = Mon, 21 Jul 2014 12:06:07 GMT

 On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote:
  That is just standard Unix time.
 
  1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT
 
 
  On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com
 wrote:
 
 
  Hi TD,
 
  Thanks for the help.
 
  The only problem left here is that the dstreamTime contains some extra
 information which seems date i.e. 1405944367000 ms whereas my application
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500
 etc. So the filter doesn't take effect.
 
  I was thinking to add that extra info to my Time(4000). But I am not
 really sure what it is?
 



Re: Spark Streaming timing considerations

2014-07-17 Thread Laeeq Ahmed
Hi TD,

I have been able to filter the first WindowedRDD, but I am not sure how to make 
a generic filter. The larger window is 8 seconds and want to fetch 4 second 
based on application-time-stamp. I have seen an earlier post which suggest 
timeStampBasedwindow but I am not sure how to make timestampBasedwindow in the 
following example. 


 val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform(windowedRDD = {
 //val timeStampBasedWindow = ???                    // define the window over 
the timestamp that you want to process
 val filteredRDD = windowedRDD.filter(_._1  4)     // filter and retain only 
the records that fall in the timestamp-based window
 return filteredRDD
 })
Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . .  
whereas key is the timestamp.

Regards,
Laeeq
 



On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 


Hi,
Thanks I will try to implement it.

Regards,
Laeeq



 On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


This is not in the current streaming API.

Queue stream is useful for testing with generated RDDs, but not for actual 
data. For actual data stream, the slack time can be implemented by doing 
DStream.window on a larger window that take slack time in consideration, and 
then the required application-time-based-window of data filtered out. For 
example, if you want a slack time of 1 minute and batches of 10 seconds, then 
do a window operation of 70 seconds, then in each RDD filter out the records 
with the desired application time and process them. 

TD



On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi,


In the spark streaming paper, slack time has been suggested for delaying the 
batch creation in case of external timestamps. I don't see any such option in 
streamingcontext. Is it available in the API?



Also going through the previous posts, queueStream has been suggested for 
this. I looked into to queueStream example.



     // Create and push some RDDs into Queue
    for (i - 1 to 30) {
    rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    Thread.sleep(1000)
    }

The only thing I am unsure is how to make batches(basic RDD) out of stream 
coming on a port.


Regards,
Laeeq

 

Re: Spark Streaming timing considerations

2014-07-17 Thread Tathagata Das
You have to define what is the range records that needs to be filtered out
in every windowed RDD, right? For example, when the DStream.window has data
from from times 0 - 8 seconds by DStream time, you only want to filter out
data that falls into say 4 - 8 seconds by application time. This latter is
the application-level time window that you need to define in the transform
function. What may help is that there is another version of transform which
allows you to get the current DStream time (that is, it will give the value
8) from which you can calculate the app-time-window 4 - 8.


val transformed = keyAndValues.window(Seconds(8),
Seconds(4)).transform((windowedRDD:
RDD[...], dstreamTime: Time) = {
 val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize
   // define the window over the timestamp that you want to process
 val currentAppTimeWindowEnd = dstreamTime
 val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd
 r._1  currentAppTimeWindowStart) // filter and retain only the
records that fall in the current app-time window
 return filteredRDD
 })

Hope this helps!

TD


On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi TD,

 I have been able to filter the first WindowedRDD, but I am not sure how
 to make a generic filter. The larger window is 8 seconds and want to
 fetch 4 second based on application-time-stamp. I have seen an earlier post
 which suggest timeStampBasedwindow but I am not sure how to make
 timestampBasedwindow in the following example.

  val transformed = keyAndValues.window(Seconds(8),
 Seconds(4)).transform(windowedRDD = {
  //val timeStampBasedWindow = ???// define the window
 over the timestamp that you want to process
  val filteredRDD = windowedRDD.filter(_._1  4) // filter and retain
 only the records that fall in the timestamp-based window
  return filteredRDD
  })

 Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . .
 .  whereas key is the timestamp.

 Regards,
 Laeeq




   On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com
 wrote:


 Hi,
 Thanks I will try to implement it.

 Regards,
 Laeeq



   On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:


 This is not in the current streaming API.

 Queue stream is useful for testing with generated RDDs, but not for actual
 data. For actual data stream, the slack time can be implemented by doing
 DStream.window on a larger window that take slack time in consideration,
 and then the required application-time-based-window of data filtered out.
 For example, if you want a slack time of 1 minute and batches of 10
 seconds, then do a window operation of 70 seconds, then in each RDD filter
 out the records with the desired application time and process them.

 TD


 On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 In the spark streaming paper, slack time has been suggested for delaying
 the batch creation in case of external timestamps. I don't see any such
 option in streamingcontext. Is it available in the API?

  Also going through the previous posts, queueStream has been suggested for
 this. I looked into to queueStream example.

  // Create and push some RDDs into Queue
 for (i - 1 to 30) {
 rddQueue += ssc.sparkContext.makeRDD(1 to 10)
 Thread.sleep(1000)
 }

 The only thing I am unsure is how to make batches(basic RDD) out of stream
 coming on a port.

 Regards,
 Laeeq










Spark Streaming timing considerations

2014-07-11 Thread Laeeq Ahmed
Hi,

In the spark streaming paper, slack time has been suggested for delaying the 
batch creation in case of external timestamps. I don't see any such option in 
streamingcontext. Is it available in the API?


Also going through the previous posts, queueStream has been suggested for this. 
I looked into to queueStream example.


     // Create and push some RDDs into Queue
    for (i - 1 to 30) {
    rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    Thread.sleep(1000)
    }

The only thing I am unsure is how to make batches(basic RDD) out of stream 
coming on a port.

Regards,
Laeeq

Re: Spark Streaming timing considerations

2014-07-11 Thread Tathagata Das
This is not in the current streaming API.

Queue stream is useful for testing with generated RDDs, but not for actual
data. For actual data stream, the slack time can be implemented by doing
DStream.window on a larger window that take slack time in consideration,
and then the required application-time-based-window of data filtered out.
For example, if you want a slack time of 1 minute and batches of 10
seconds, then do a window operation of 70 seconds, then in each RDD filter
out the records with the desired application time and process them.

TD


On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 In the spark streaming paper, slack time has been suggested for delaying
 the batch creation in case of external timestamps. I don't see any such
 option in streamingcontext. Is it available in the API?

 Also going through the previous posts, queueStream has been suggested for
 this. I looked into to queueStream example.

  // Create and push some RDDs into Queue
 for (i - 1 to 30) {
 rddQueue += ssc.sparkContext.makeRDD(1 to 10)
 Thread.sleep(1000)
 }

 The only thing I am unsure is how to make batches(basic RDD) out of stream
 coming on a port.

 Regards,
 Laeeq