You can probably do it in a simpler but sort of hacky way! If your window size is W and sliding interval S, you can do some math to figure out how many of the first windows are actually partial windows. Its probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can increment a global counter to count how many RDDs have been generated and ignore the first few RDDs.
windowDStream.foreachRDD(rdd => { Global.counter += 1 if (Global.counter < math.ceil(W/S)) { return // ignore } else { // do something awesome } }) On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu <amoc...@verticalscope.com>wrote: > Let me rephrase that, > Do you think it is possible to use an accumulator to skip the first few > incomplete RDDs? > > -----Original Message----- > From: Adrian Mocanu [mailto:amoc...@verticalscope.com] > Sent: March-25-14 9:57 AM > To: user@spark.apache.org > Cc: u...@spark.incubator.apache.org > Subject: RE: [bug?] streaming window unexpected behaviour > > Thanks TD! > Is it possible to perhaps add another window method that doesn't not > generate partial windows? Or, Is it possible to remove the first few > partial windows? I'm thinking of using an accumulator to count how many > windows there are. > > -A > > -----Original Message----- > From: Tathagata Das [mailto:tathagata.das1...@gmail.com] > Sent: March-24-14 6:55 PM > To: user@spark.apache.org > Cc: u...@spark.incubator.apache.org > Subject: Re: [bug?] streaming window unexpected behaviour > > Yes, I believe that is current behavior. Essentially, the first few RDDs > will be partial windows (assuming window duration > sliding interval). > > TD > > > On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <amoc...@verticalscope.com> > wrote: > > I have what I would call unexpected behaviour when using window on a > stream. > > > > I have 2 windowed streams with a 5s batch interval. One window stream > > is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow > > > > What I've noticed is that the 1st RDD produced by bigWindow is > > incorrect and is of the size 5s not 10s. So instead of waiting 10s and > > producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. > > > > Why is this happening? To me it looks like a bug; Matei or TD can you > > verify that this is correct behaviour? > > > > > > > > > > > > I have the following code > > > > val ssc = new StreamingContext(conf, Seconds(5)) > > > > > > > > val smallWindowStream = ssc.queueStream(smallWindowRddQueue) > > > > val bigWindowStream = ssc.queueStream(bigWindowRddQueue) > > > > > > > > val smallWindow = smallWindowReshapedStream.window(Seconds(5), > > Seconds(5)) > > > > .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3)) > > > > val bigWindow = bigWindowReshapedStream.window(Seconds(10), > > Seconds(5)) > > > > .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3)) > > > > > > > > -Adrian > > > > >