zeroTime marks the time when the streaming job started, and the first batch of data is from zeroTime to zeroTime + slideDuration. The validity check of time - zeroTime) being multiple of slideDuration is to ensure that for a given dstream, it generates RDD at the right times. For example, say the batch size is 1 second. For a input DStream, the slideDuration will be 1 second, it should generate a RDD of input data every 1 second. However, with window ops, the slideInterval of a dstream can be, say 2 seconds (using window(10 seconds, 2 seconds). In that case, RDDs should be generated every 2 seconds. This check ensures that.
The reduceByKeyAndWindow operation is a sliding window, so the RDDs generate by the windowed DStream will contain data between (validTime - windowDuration to validTime). Now, the way it is implemented is that it unifies (RDD.union) the RDDs containing data from (validTime - slideDuration to validTime), (validTime - 2 * slideDuration to validTime - slideDuration), ..... till the trailing edge of the window (i.e. validTime - windowDuration). Hence, it is necessary that the windowDuration is a multiple of the slideDuration. TD On Wed, Jun 18, 2014 at 3:22 PM, Hatch M <hatchman1...@gmail.com> wrote: > Ok that patch does fix the key lookup exception. However, curious about > the time validity check..isValidTime ( > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264 > ) > > Why does (time - zerotime) have to be a multiple of slide duration ? > Shouldn't the reduceByKeyAndWindow aggregate every record in a given > window (zeroTime to zeroTime+windowDuration)? > > > On Tue, Jun 17, 2014 at 10:55 PM, Hatch M <hatchman1...@gmail.com> wrote: > >> Thanks! Will try to get the fix and retest. >> >> >> On Tue, Jun 17, 2014 at 5:30 PM, onpoq l <onpo...@gmail.com> wrote: >> >>> There is a bug: >>> >>> https://github.com/apache/spark/pull/961#issuecomment-45125185 >>> >>> >>> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <hatchman1...@gmail.com> wrote: >>> > Trying to aggregate over a sliding window, playing with the slide >>> duration. >>> > Playing around with the slide interval I can see the aggregation works >>> but >>> > mostly fails with the below error. The stream has records coming in at >>> > 100ms. >>> > >>> > JavaPairDStream<String, AggregateObject> aggregatedDStream = >>> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new >>> > Duration(600000)); >>> > >>> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms >>> is >>> > invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms >>> and >>> > difference is 1100 ms >>> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: >>> > 1403050486900 ms >>> > java.util.NoSuchElementException: key not found: 1403050486900 ms >>> > at scala.collection.MapLike$class.default(MapLike.scala:228) >>> > >>> > Any hints on whats going on here? >>> > Thanks! >>> > Hatch >>> > >>> >> >> >