Re: Issue while trying to aggregate with a sliding window
zeroTime marks the time when the streaming job started, and the first batch of data is from zeroTime to zeroTime + slideDuration. The validity check of time - zeroTime) being multiple of slideDuration is to ensure that for a given dstream, it generates RDD at the right times. For example, say the batch size is 1 second. For a input DStream, the slideDuration will be 1 second, it should generate a RDD of input data every 1 second. However, with window ops, the slideInterval of a dstream can be, say 2 seconds (using window(10 seconds, 2 seconds). In that case, RDDs should be generated every 2 seconds. This check ensures that. The reduceByKeyAndWindow operation is a sliding window, so the RDDs generate by the windowed DStream will contain data between (validTime - windowDuration to validTime). Now, the way it is implemented is that it unifies (RDD.union) the RDDs containing data from (validTime - slideDuration to validTime), (validTime - 2 * slideDuration to validTime - slideDuration), . till the trailing edge of the window (i.e. validTime - windowDuration). Hence, it is necessary that the windowDuration is a multiple of the slideDuration. TD On Wed, Jun 18, 2014 at 3:22 PM, Hatch M hatchman1...@gmail.com wrote: Ok that patch does fix the key lookup exception. However, curious about the time validity check..isValidTime ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264 ) Why does (time - zerotime) have to be a multiple of slide duration ? Shouldn't the reduceByKeyAndWindow aggregate every record in a given window (zeroTime to zeroTime+windowDuration)? On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote: Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Issue while trying to aggregate with a sliding window
Ok that patch does fix the key lookup exception. However, curious about the time validity check..isValidTime ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264 ) Why does (time - zerotime) have to be a multiple of slide duration ? Shouldn't the reduceByKeyAndWindow aggregate every record in a given window (zeroTime to zeroTime+windowDuration)? On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote: Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Issue while trying to aggregate with a sliding window
There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Issue while trying to aggregate with a sliding window
Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch