Re: streaming window not behaving as advertised (v1.0.1)

2014-08-05 Thread Tathagata Das
1. udpateStateByKey should be called on all keys even if there is not data
corresponding to that key. There is a unit test for that.
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala#L337

2. I am increasing the priority for this. Off the top of my head, this is
easy to fix, but hard to test reliably test in a unit test. Will fix it
soon after Spark 1.1 release.

TD


On Fri, Aug 1, 2014 at 7:37 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Hi TD,

 I've also been fighting this issue only to find the exact same solution you
 are suggesting.
 Too bad I didn't find either the post or the issue sooner.

 I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
 state objects) per batch and only calling the updatestatebykey function.

 This is my interpretation, please correct me if needed:
 Because of Spark’s lazy computation the RDDs weren’t being updated as
 expected on the batch interval execution. The assumption was that as long
 as
 I have a streaming batch run (with or without new messages), I should get
 updated RDDs, which was not happening. We only get updateStateByKey calls
 for objects which got events or that are forced through an output function
 to compute. I did not make further test to confirm this, but that's the
 given impression.

 This doesn't fit our requirements as we want to do duration updates based
 on
 the batch interval execution...so I had to force the computation of all the
 objects through the ForeachRDD function.

 I will also appreciate if the priority can be increased to the issue. I
 assume the ForeachRDD is additional unnecessary resource allocation
 (although I'm not sure how much) as opposite to doing it somehow by default
 on batch interval execution.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: streaming window not behaving as advertised (v1.0.1)

2014-08-01 Thread Venkat Subramanian
TD,

We are seeing the same issue. We struggled through this until we found this
post and the work around.
A quick fix in the Spark Streaming software will help a lot for others who
are encountering this and pulling their hair out on why RDD on some
partitions are not computed (we ended up spending weeks trying to figure out
what is happening here and trying out different things).

This issue has been around from 0.9 till date (1.01) at least.

Thanks,

Venkat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: streaming window not behaving as advertised (v1.0.1)

2014-08-01 Thread RodrigoB
Hi TD,

I've also been fighting this issue only to find the exact same solution you
are suggesting. 
Too bad I didn't find either the post or the issue sooner.

I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
state objects) per batch and only calling the updatestatebykey function.

This is my interpretation, please correct me if needed:
Because of Spark’s lazy computation the RDDs weren’t being updated as
expected on the batch interval execution. The assumption was that as long as
I have a streaming batch run (with or without new messages), I should get
updated RDDs, which was not happening. We only get updateStateByKey calls
for objects which got events or that are forced through an output function
to compute. I did not make further test to confirm this, but that's the
given impression.

This doesn't fit our requirements as we want to do duration updates based on
the batch interval execution...so I had to force the computation of all the
objects through the ForeachRDD function.

I will also appreciate if the priority can be increased to the issue. I
assume the ForeachRDD is additional unnecessary resource allocation
(although I'm not sure how much) as opposite to doing it somehow by default
on batch interval execution. 

tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: streaming window not behaving as advertised (v1.0.1)

2014-07-26 Thread Tathagata Das
Yeah, maybe I should bump the issue to major. Now that I thought about
to give my previous answer, this should be easy to fix just by doing a
foreachRDD on all the input streams within the system (rather than
explicitly doing it like I asked you to do).

Thanks Alan, for testing this out and confirming that this was the
same issue. I was worried that this is a totally new issue that we did
not know of.

TD

On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai a...@opsclarity.com wrote:
 TD, it looks like your instincts were correct.  I misunderstood what you
 meant.  If I force an eval on the inputstream using foreachRDD, the
 windowing will work correctly.  If I don’t do that, lazy eval somehow screws
 with window batches I eventually receive.  Any reason the bug is categorized
 as minor?  It seems that anyone who uses the windowing functionality would
 run into this bug.  I imagine this would include anyone who wants to use
 spark streaming to aggregate data in fixed time batches, which seems like a
 fairly common use case.

 Alan



 On Jul 22, 2014, at 11:30 PM, Alan Ngai a...@opsclarity.com wrote:

 foreachRDD is how I extracted values in the first place, so that’s not going
 to make a difference.  I don’t think it’s related to SPARK-1312 because I’m
 generating data every second in the first place and I’m using foreachRDD
 right after the window operation.  The code looks something like

 val batchInterval = 5
 val windowInterval = 25
 val slideInterval = 15

 val windowedStream = inputStream.window(Seconds(windowInterval),
 Seconds(slideInterval))

 val outputFunc = (r: RDD[MetricEvent], t: Time) = {
   println(
 %s.format(t.milliseconds / 1000))
   r.foreach{metric =
 val timeKey = metric.timeStamp / batchInterval * batchInterval
 println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name,
 metric.value))
   }
 }
 testWindow.foreachRDD(outputFunc)

 On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 It could be related to this bug that is currently open.
 https://issues.apache.org/jira/browse/SPARK-1312

 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and
 try these combos again?

 TD


 On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:

 I have a sample application pumping out records 1 per second.  The batch
 interval is set to 5 seconds.  Here’s a list of “observed window intervals”
 vs what was actually set

 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3

 can someone explain this behavior to me?  I’m trying to aggregate metrics
 by time batches, but want to skip partial batches.  Therefore, I’m trying to
 find a combination which results in 1 overlapped batch, but no combination I
 tried gets me there.

 Alan






Re: streaming window not behaving as advertised (v1.0.1)

2014-07-23 Thread Alan Ngai
foreachRDD is how I extracted values in the first place, so that’s not going to 
make a difference.  I don’t think it’s related to SPARK-1312 because I’m 
generating data every second in the first place and I’m using foreachRDD right 
after the window operation.  The code looks something like

val batchInterval = 5
val windowInterval = 25
val slideInterval = 15

val windowedStream = inputStream.window(Seconds(windowInterval), 
Seconds(slideInterval))

val outputFunc = (r: RDD[MetricEvent], t: Time) = {
  println( %s.format(t.milliseconds / 
1000))
  r.foreach{metric =
val timeKey = metric.timeStamp / batchInterval * batchInterval
println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name, 
metric.value))
  }
}
testWindow.foreachRDD(outputFunc)

On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

 It could be related to this bug that is currently open. 
 https://issues.apache.org/jira/browse/SPARK-1312
 
 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and 
 try these combos again?
 
 TD
 
 
 On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:
 I have a sample application pumping out records 1 per second.  The batch 
 interval is set to 5 seconds.  Here’s a list of “observed window intervals” 
 vs what was actually set
 
 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3
 
 can someone explain this behavior to me?  I’m trying to aggregate metrics by 
 time batches, but want to skip partial batches.  Therefore, I’m trying to 
 find a combination which results in 1 overlapped batch, but no combination I 
 tried gets me there.
 
 Alan
 
 



Re: streaming window not behaving as advertised (v1.0.1)

2014-07-23 Thread Alan Ngai
TD, it looks like your instincts were correct.  I misunderstood what you meant. 
 If I force an eval on the inputstream using foreachRDD, the windowing will 
work correctly.  If I don’t do that, lazy eval somehow screws with window 
batches I eventually receive.  Any reason the bug is categorized as minor?  It 
seems that anyone who uses the windowing functionality would run into this bug. 
 I imagine this would include anyone who wants to use spark streaming to 
aggregate data in fixed time batches, which seems like a fairly common use case.

Alan


On Jul 22, 2014, at 11:30 PM, Alan Ngai a...@opsclarity.com wrote:

 foreachRDD is how I extracted values in the first place, so that’s not going 
 to make a difference.  I don’t think it’s related to SPARK-1312 because I’m 
 generating data every second in the first place and I’m using foreachRDD 
 right after the window operation.  The code looks something like
 
 val batchInterval = 5
 val windowInterval = 25
 val slideInterval = 15
 
 val windowedStream = inputStream.window(Seconds(windowInterval), 
 Seconds(slideInterval))
 
 val outputFunc = (r: RDD[MetricEvent], t: Time) = {
   println( %s.format(t.milliseconds 
 / 1000))
   r.foreach{metric =
 val timeKey = metric.timeStamp / batchInterval * batchInterval
 println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name, 
 metric.value))
   }
 }
 testWindow.foreachRDD(outputFunc)
 
 On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com 
 wrote:
 
 It could be related to this bug that is currently open. 
 https://issues.apache.org/jira/browse/SPARK-1312
 
 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and 
 try these combos again?
 
 TD
 
 
 On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:
 I have a sample application pumping out records 1 per second.  The batch 
 interval is set to 5 seconds.  Here’s a list of “observed window intervals” 
 vs what was actually set
 
 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3
 
 can someone explain this behavior to me?  I’m trying to aggregate metrics by 
 time batches, but want to skip partial batches.  Therefore, I’m trying to 
 find a combination which results in 1 overlapped batch, but no combination I 
 tried gets me there.
 
 Alan
 
 
 



Re: streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Tathagata Das
It could be related to this bug that is currently open.
https://issues.apache.org/jira/browse/SPARK-1312

Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and
try these combos again?

TD


On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote:

 I have a sample application pumping out records 1 per second.  The batch
 interval is set to 5 seconds.  Here’s a list of “observed window intervals”
 vs what was actually set

 window=25, slide=25 : observed-window=25, overlapped-batches=0
 window=25, slide=20 : observed-window=20, overlapped-batches=0
 window=25, slide=15 : observed-window=15, overlapped-batches=0
 window=25, slide=10 : observed-window=20, overlapped-batches=2
 window=25, slide=5 : observed-window=25, overlapped-batches=3

 can someone explain this behavior to me?  I’m trying to aggregate metrics
 by time batches, but want to skip partial batches.  Therefore, I’m trying
 to find a combination which results in 1 overlapped batch, but no
 combination I tried gets me there.

 Alan