zeroTime marks the time when the streaming job started, and the first batch
of data is from zeroTime to zeroTime + slideDuration. The validity check of
time - zeroTime) being multiple of slideDuration is to ensure that for a
given dstream, it generates RDD at the right times. For example, say the
batch size is 1 second. For a input DStream, the slideDuration will be 1
second, it should generate a RDD of input data every 1 second. However,
with window ops, the slideInterval of a dstream can be, say 2 seconds
(using window(10 seconds, 2 seconds). In that case, RDDs should be
generated every 2 seconds. This check ensures that.

The reduceByKeyAndWindow operation is a sliding window, so the RDDs
generate by the windowed DStream will contain data between (validTime -
windowDuration to validTime). Now, the way it is implemented is that it
unifies (RDD.union) the RDDs containing data from (validTime -
slideDuration to validTime), (validTime - 2 * slideDuration to validTime -
slideDuration), ..... till the trailing edge of the window (i.e. validTime
- windowDuration). Hence, it is necessary that the windowDuration is a
multiple of the slideDuration.


TD


On Wed, Jun 18, 2014 at 3:22 PM, Hatch M <hatchman1...@gmail.com> wrote:

> Ok that patch does fix the key lookup exception. However, curious about
> the time validity check..isValidTime (
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
> )
>
> Why does (time - zerotime) have to be a multiple of slide duration ?
> Shouldn't the reduceByKeyAndWindow aggregate every record in a given
> window (zeroTime to zeroTime+windowDuration)?
>
>
> On Tue, Jun 17, 2014 at 10:55 PM, Hatch M <hatchman1...@gmail.com> wrote:
>
>> Thanks! Will try to get the fix and retest.
>>
>>
>> On Tue, Jun 17, 2014 at 5:30 PM, onpoq l <onpo...@gmail.com> wrote:
>>
>>> There is a bug:
>>>
>>> https://github.com/apache/spark/pull/961#issuecomment-45125185
>>>
>>>
>>> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M <hatchman1...@gmail.com> wrote:
>>> > Trying to aggregate over a sliding window, playing with the slide
>>> duration.
>>> > Playing around with the slide interval I can see the aggregation works
>>> but
>>> > mostly fails with the below error. The stream has records coming in at
>>> > 100ms.
>>> >
>>> > JavaPairDStream<String, AggregateObject> aggregatedDStream =
>>> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(60000), new
>>> > Duration(600000));
>>> >
>>> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms
>>> is
>>> > invalid as zeroTime is 1403050485800 ms and slideDuration is 60000 ms
>>> and
>>> > difference is 1100 ms
>>> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
>>> > 1403050486900 ms
>>> > java.util.NoSuchElementException: key not found: 1403050486900 ms
>>> > at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> >
>>> > Any hints on whats going on here?
>>> > Thanks!
>>> > Hatch
>>> >
>>>
>>
>>
>

Reply via email to