Wesley Miao created SPARK-7326: ---------------------------------- Summary: Performing window() on a WindowedDStream doesn't work all the time Key: SPARK-7326 URL: https://issues.apache.org/jira/browse/SPARK-7326 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.1 Reporter: Wesley Miao
Someone reported similar issues before but got no response. http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html And I met the same issue recently and it can be reproduced in 1.3.1 by the following piece of code: def main(args: Array[String]) { val batchInterval = "1234" val sparkConf = new SparkConf() .setAppName("WindowOnWindowedDStream") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval.toInt)) ssc.checkpoint("checkpoint") def createRDD(i: Int) : RDD[(String, Int)] = { val count = 1000 val rawLogs = (1 to count).map{ _ => val word = "word" + Random.nextInt.abs % 5 (word, 1) } ssc.sparkContext.parallelize(rawLogs) } val rddQueue = mutable.Queue[RDD[(String, Int)]]() val rawLogStream = ssc.queueStream(rddQueue) (1 to 300) foreach { i => rddQueue.enqueue(createRDD(i)) } val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _) val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _) l1.print() l2.print() ssc.start() ssc.awaitTermination() } Here we have two windowed DStream instance l1 and l2. l1 is the result DStream by performing a window() on the source DStream with both window and sliding duration 5 times the batch internal of the source stream. l2 is the result DStream by performing a window() on l1, with both window and sliding duration 3 times l1's batch interval, which is 15 times of the source stream. >From the output of this simple streaming app, I can only see print data output >from l1 and no data printed from l2. Diving into the source code, I found the problem may most likely reside in DStream.slice() implementation, as shown below. def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None }) } Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid check failed for all the remaining computation. The fix would be to add a new floor() function in Time.scala to respect the zeroTime while performing the floor : def floor(that: Duration, zeroTime: Time): Time = { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } And then change the DStream.slice to call this new floor function by passing in its zeroTime. val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org