Re: Issue while trying to aggregate with a sliding window

2014-06-19 Thread Tathagata Das
zeroTime marks the time when the streaming job started, and the first batch
of data is from zeroTime to zeroTime + slideDuration. The validity check of
time - zeroTime) being multiple of slideDuration is to ensure that for a
given dstream, it generates RDD at the right times. For example, say the
batch size is 1 second. For a input DStream, the slideDuration will be 1
second, it should generate a RDD of input data every 1 second. However,
with window ops, the slideInterval of a dstream can be, say 2 seconds
(using window(10 seconds, 2 seconds). In that case, RDDs should be
generated every 2 seconds. This check ensures that.

The reduceByKeyAndWindow operation is a sliding window, so the RDDs
generate by the windowed DStream will contain data between (validTime -
windowDuration to validTime). Now, the way it is implemented is that it
unifies (RDD.union) the RDDs containing data from (validTime -
slideDuration to validTime), (validTime - 2 * slideDuration to validTime -
slideDuration), . till the trailing edge of the window (i.e. validTime
- windowDuration). Hence, it is necessary that the windowDuration is a
multiple of the slideDuration.


TD


On Wed, Jun 18, 2014 at 3:22 PM, Hatch M hatchman1...@gmail.com wrote:

 Ok that patch does fix the key lookup exception. However, curious about
 the time validity check..isValidTime (
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
 )

 Why does (time - zerotime) have to be a multiple of slide duration ?
 Shouldn't the reduceByKeyAndWindow aggregate every record in a given
 window (zeroTime to zeroTime+windowDuration)?


 On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote:

 Thanks! Will try to get the fix and retest.


 On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote:

 There is a bug:

 https://github.com/apache/spark/pull/961#issuecomment-45125185


 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote:
  Trying to aggregate over a sliding window, playing with the slide
 duration.
  Playing around with the slide interval I can see the aggregation works
 but
  mostly fails with the below error. The stream has records coming in at
  100ms.
 
  JavaPairDStreamString, AggregateObject aggregatedDStream =
  pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
  Duration(60));
 
  14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms
 is
  invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms
 and
  difference is 1100 ms
  14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
  1403050486900 ms
  java.util.NoSuchElementException: key not found: 1403050486900 ms
  at scala.collection.MapLike$class.default(MapLike.scala:228)
 
  Any hints on whats going on here?
  Thanks!
  Hatch
 






Re: Issue while trying to aggregate with a sliding window

2014-06-18 Thread Hatch M
Ok that patch does fix the key lookup exception. However, curious about the
time validity check..isValidTime (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
)

Why does (time - zerotime) have to be a multiple of slide duration ?
Shouldn't the reduceByKeyAndWindow aggregate every record in a given window
(zeroTime to zeroTime+windowDuration)?


On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote:

 Thanks! Will try to get the fix and retest.


 On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote:

 There is a bug:

 https://github.com/apache/spark/pull/961#issuecomment-45125185


 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote:
  Trying to aggregate over a sliding window, playing with the slide
 duration.
  Playing around with the slide interval I can see the aggregation works
 but
  mostly fails with the below error. The stream has records coming in at
  100ms.
 
  JavaPairDStreamString, AggregateObject aggregatedDStream =
  pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
  Duration(60));
 
  14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
  invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms
 and
  difference is 1100 ms
  14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
  1403050486900 ms
  java.util.NoSuchElementException: key not found: 1403050486900 ms
  at scala.collection.MapLike$class.default(MapLike.scala:228)
 
  Any hints on whats going on here?
  Thanks!
  Hatch
 





Re: Issue while trying to aggregate with a sliding window

2014-06-17 Thread onpoq l
There is a bug:

https://github.com/apache/spark/pull/961#issuecomment-45125185


On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote:
 Trying to aggregate over a sliding window, playing with the slide duration.
 Playing around with the slide interval I can see the aggregation works but
 mostly fails with the below error. The stream has records coming in at
 100ms.

 JavaPairDStreamString, AggregateObject aggregatedDStream =
 pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
 Duration(60));

 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
 invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and
 difference is 1100 ms
 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
 1403050486900 ms
 java.util.NoSuchElementException: key not found: 1403050486900 ms
 at scala.collection.MapLike$class.default(MapLike.scala:228)

 Any hints on whats going on here?
 Thanks!
 Hatch



Re: Issue while trying to aggregate with a sliding window

2014-06-17 Thread Hatch M
Thanks! Will try to get the fix and retest.


On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote:

 There is a bug:

 https://github.com/apache/spark/pull/961#issuecomment-45125185


 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote:
  Trying to aggregate over a sliding window, playing with the slide
 duration.
  Playing around with the slide interval I can see the aggregation works
 but
  mostly fails with the below error. The stream has records coming in at
  100ms.
 
  JavaPairDStreamString, AggregateObject aggregatedDStream =
  pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
  Duration(60));
 
  14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
  invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and
  difference is 1100 ms
  14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
  1403050486900 ms
  java.util.NoSuchElementException: key not found: 1403050486900 ms
  at scala.collection.MapLike$class.default(MapLike.scala:228)
 
  Any hints on whats going on here?
  Thanks!
  Hatch