Hi All,

I found out why this problem exists. Consider the following scenario:
- a DStream is created from any source. (I've checked with file and socket)
- No actions are applied to this DStream
- Sliding Window operation is applied to this DStream and an action is applied 
to the sliding window.
In this case, Spark will not even read the input stream in the batch in which 
the sliding interval isn't a multiple of batch interval. Put another way, it 
won't read the input when it doesn't have to apply the window function. This is 
happening because all transformations in Spark are lazy.

How to fix this or workaround it (see line#3):
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 
* 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
inputStream.print(); // This is the workaround
JavaDStream<String> objWindow = inputStream.window(new 
Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");


The "Window operations" example on the streaming guide implies that Spark will 
read the stream in every batch, which is not happening because of the lazy 
transformations.
Wherever sliding window would be used, in most of the cases, no actions will be 
taken on the pre-window batch, hence my gut feeling was that Streaming would 
read every batch if any actions are being taken in the windowed stream.

Regards,
Sanjay



On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sanjay_a...@yahoo.com> 
wrote:
 
Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 
seconds. This is quite similar to the sliding window pictorial example under 
Window Operations section on 
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html 
. 

The RDDs returned by window transformation function are incorrect in my case. 
To investigate this further, I ran a series of examples with varying values of 
window length & slide interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be 
multiples of batch interval(1 in my case) and obviously, I get a run time error 
if I attempt to violate this condition. Looking at my results, it seems that 
failures result when the slide interval isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 
* 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
JavaDStream<String> objWindow = inputStream.window(new 
Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or 
there is a problem in the sliding window mechanism.

Regards,
Sanjay

Reply via email to