Hello Sanjay, Yes, your understanding of lazy semantics is correct. But ideally every batch should read based on the batch interval provided in the StreamingContext. Can you open a JIRA on this?
On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani <sanjay_a...@yahoo.com> wrote: > Hi All, > > I found out why this problem exists. Consider the following scenario: > - a DStream is created from any source. (I've checked with file and socket) > - No actions are applied to this DStream > - Sliding Window operation is applied to this DStream and an action is > applied to the sliding window. > In this case, Spark will not even read the input stream in the batch in > which the sliding interval isn't a multiple of batch interval. Put another > way, it won't read the input when it doesn't have to apply the window > function. This is happening because all transformations in Spark are lazy. > > How to fix this or workaround it (see line#3): > JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new > Duration(1 * 60 * 1000)); > JavaDStream<String> inputStream = stcObj.textFileStream("/Input"); > inputStream.print(); // This is the workaround > JavaDStream<String> objWindow = inputStream.window(new > Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); > objWindow.dstream().saveAsTextFiles("/Output", ""); > > > The "Window operations" example on the streaming guide implies that Spark > will read the stream in every batch, which is not happening because of the > lazy transformations. > Wherever sliding window would be used, in most of the cases, no actions will > be taken on the pre-window batch, hence my gut feeling was that Streaming > would read every batch if any actions are being taken in the windowed > stream. > > Regards, > Sanjay > > > On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sanjay_a...@yahoo.com> > wrote: > Hi, > > I want to run a map/reduce process over last 5 seconds of data, every 4 > seconds. This is quite similar to the sliding window pictorial example under > Window Operations section on > http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html > . > > The RDDs returned by window transformation function are incorrect in my > case. To investigate this further, I ran a series of examples with varying > values of window length & slide interval. Summary of the test results: > (window length, slide interval) -> result > (3,1) -> success > (4,2) -> success > (3,2) -> fail > (4,3) -> fail > (5,4) -> fail > (5,2) -> fail > > The only condition mentioned in the doc is that the two values(5 & 4) should > be multiples of batch interval(1 in my case) and obviously, I get a run time > error if I attempt to violate this condition. Looking at my results, it > seems that failures result when the slide interval isn't a multiple of > window length. > > My code: > JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new > Duration(1 * 60 * 1000)); > JavaDStream<String> inputStream = stcObj.textFileStream("/Input"); > JavaDStream<String> objWindow = inputStream.window(new > Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); > objWindow.dstream().saveAsTextFiles("/Output", ""); > > Detailed results: > (3,1) -> success > @t_0: [inputStream's RDD@t_0] > @t_1: [inputStream's RDD@t_0,1] > @t_2: [inputStream's RDD@t_0,1,2] > @t_3: [inputStream's RDD@t_1,2,3] > @t_4: [inputStream's RDD@t_2,3,4] > @t_5: [inputStream's RDD@t_3,4,5] > > (4,2) -> success > @t_0: nothing > @t_1: [inputStream's RDD@t_0,1] > @t_2: nothing > @t_3: [inputStream's RDD@t_0,1,2,3] > @t_4: nothing > @t_5: [inputStream's RDD@t_2,3,4,5] > > (3,2) -> fail > @t_0: nothing > @t_1: [inputStream's RDD@t_0,1] > @t_2: nothing > @t_3: [inputStream's RDD@t_2,3] //(expected RDD@t_1,2,3) > @t_4: nothing > @t_5: [inputStream's RDD@t_4,5] //(expected RDD@t_3,4,5) > > (4,3) -> fail > @t_0: nothing > @t_1: nothing > @t_2: [inputStream's RDD@t_0,1,2] > @t_3: nothing > @t_4: nothing > @t_5: [inputStream's RDD@t_3,4,5] //(expected RDD@t_2,3,4,5) > > (5,4) -> fail > @t_0: nothing > @t_1: nothing > @t_2: nothing > @t_3: [inputStream's RDD@t_0,1,2,3] > @t_4: nothing > @t_5: nothing > @t_6: nothing > @t_7: [inputStream's RDD@t_4,5,6,7] //(expected RDD@t_3,4,5,6,7) > > (5,2) -> fail > @t_0: nothing > @t_1: [inputStream's RDD@t_0,1] > @t_2: nothing > @t_3: [inputStream's RDD@t_0,1,2,3] > @t_4: nothing > @t_5: [inputStream's RDD@t_2,3,4,5] //(expected RDD@t_1,2,3,4,5) > @t_6: nothing > @t_7: [inputStream's RDD@t_4,5,6,7] //(expected RDD@t_3,4,5,6,7) > > I have run all the above examples twice to be sure ! > I believe either my understanding of sliding window mechanism is incorrect > or there is a problem in the sliding window mechanism. > > Regards, > Sanjay > >