[ https://issues.apache.org/jira/browse/SPARK-7326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525815#comment-14525815 ]
Sean Owen commented on SPARK-7326: ---------------------------------- Out of curiosity why do you have a window + slide of 5x the batch duration? that is simply the same as making an underlying stream with 5x the batch duration. Maybe this is just a toy example. I know it's not directly relevant to what you're reporting. > Performing window() on a WindowedDStream doesn't work all the time > ------------------------------------------------------------------ > > Key: SPARK-7326 > URL: https://issues.apache.org/jira/browse/SPARK-7326 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.3.1 > Reporter: Wesley Miao > > Someone reported similar issues before but got no response. > http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html > And I met the same issue recently and it can be reproduced in 1.3.1 by the > following piece of code: > def main(args: Array[String]) { > val batchInterval = "1234" > val sparkConf = new SparkConf() > .setAppName("WindowOnWindowedDStream") > .setMaster("local[2]") > val ssc = new StreamingContext(sparkConf, > Milliseconds(batchInterval.toInt)) > ssc.checkpoint("checkpoint") > def createRDD(i: Int) : RDD[(String, Int)] = { > val count = 1000 > val rawLogs = (1 to count).map{ _ => > val word = "word" + Random.nextInt.abs % 5 > (word, 1) > } > ssc.sparkContext.parallelize(rawLogs) > } > val rddQueue = mutable.Queue[RDD[(String, Int)]]() > val rawLogStream = ssc.queueStream(rddQueue) > (1 to 300) foreach { i => > rddQueue.enqueue(createRDD(i)) > } > val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, > Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _) > val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, > Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _) > l1.print() > l2.print() > ssc.start() > ssc.awaitTermination() > } > Here we have two windowed DStream instance l1 and l2. > l1 is the result DStream by performing a window() on the source DStream with > both window and sliding duration 5 times the batch internal of the source > stream. > l2 is the result DStream by performing a window() on l1, with both window and > sliding duration 3 times l1's batch interval, which is 15 times of the source > stream. > From the output of this simple streaming app, I can only see print data > output from l1 and no data printed from l2. > Diving into the source code, I found the problem may most likely reside in > DStream.slice() implementation, as shown below. > def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { > if (!isInitialized) { > throw new SparkException(this + " has not been initialized") > } > if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { > logWarning("fromTime (" + fromTime + ") is not a multiple of > slideDuration (" > + slideDuration + ")") > } > if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { > logWarning("toTime (" + fromTime + ") is not a multiple of > slideDuration (" > + slideDuration + ")") > } > val alignedToTime = toTime.floor(slideDuration, zeroTime) > val alignedFromTime = fromTime.floor(slideDuration, zeroTime) > logInfo("Slicing from " + fromTime + " to " + toTime + > " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") > alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { > if (time >= zeroTime) getOrCompute(time) else None > }) > } > Here after performing floor() on both fromTime and toTime, the result > (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be > multiple of the slidingDuration, thus making isTimeValid() check failed for > all the remaining computation. > The fix would be to add a new floor() function in Time.scala to respect the > zeroTime while performing the floor : > def floor(that: Duration, zeroTime: Time): Time = { > val t = that.milliseconds > new Time(((this.millis - zeroTime.milliseconds) / t) * t + > zeroTime.milliseconds) > } > And then change the DStream.slice to call this new floor function by passing > in its zeroTime. > val alignedToTime = toTime.floor(slideDuration, zeroTime) > val alignedFromTime = fromTime.floor(slideDuration, zeroTime) > This way the alignedToTime and alignedFromTime are *really* aligned in > respect to zeroTime whose value is not really a 0. > -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org