[ https://issues.apache.org/jira/browse/SPARK-21206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062284#comment-16062284 ]
Fei Shao edited comment on SPARK-21206 at 6/25/17 10:57 AM: ------------------------------------------------------------ Hi Sean Owen, I am sorry, I did not give enough message about this issue. For my test code: lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds. ===========log begin ============ 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms] 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms] 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds. ===========log end============ ===========code in ReducedWindowedDStream.scala begin============ override def compute(validTime: Time): Option[RDD[(K, V)]] = { val reduceF = reduceFunc val invReduceF = invReduceFunc val currentTime = validTime val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) logDebug("Slide time = " + slideDuration) logDebug("Zero time = " + zeroTime) logDebug("Current window = " + currentWindow) logDebug("Previous window = " + previousWindow) // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs // // Get the RDDs of the reduced values in "old time steps" val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)" logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is "reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration, currentWindow.endTime)" logDebug("# new RDDs = " + newRDDs.size) ===========code in ReducedWindowedDStream.scala end============ was (Author: robin shao): Hi Sean Owen, I am sorry, I did not give enough message about this issue. For my test code: lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds. ===========log begin ============ 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms] 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms] 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds. ===========log end============ ===========code in ReducedWindowedDStream.scala begin============ override def compute(validTime: Time): Option[RDD[(K, V)]] = { val reduceF = reduceFunc val invReduceF = invReduceFunc val currentTime = validTime val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) logDebug("Slide time = " + slideDuration) logDebug("Zero time = " + zeroTime) logDebug("Current window = " + currentWindow) logDebug("Previous window = " + previousWindow) // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs // // Get the RDDs of the reduced values in "old time steps" val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)" logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration, currentWindow.endTime) logDebug("# new RDDs = " + newRDDs.size) ===========code in ReducedWindowedDStream.scala end============ > the window slice of Dstream is wrong > ------------------------------------ > > Key: SPARK-21206 > URL: https://issues.apache.org/jira/browse/SPARK-21206 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.0 > Reporter: Fei Shao > > the code is : > val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]") > val ssc = new StreamingContext(conf, Seconds(1)) > ssc.checkpoint( "path") > val lines = ssc.socketTextStream("IP", PORT) > lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { > println( "RDD ID IS : " + s.id) > s.foreach( e => println("data is " + e._1 + " :" + e._2)) > println() > }) > The result is wrong. > I checked the log, it showed: > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = > [1498383085000 ms, 1498383086000 ms] > 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = > [1498383077000 ms, 1498383078000 ms] > 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to > 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) > 17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as > zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms > 17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid > 17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid > the slice time is wrong. > [BTW]: Team members, > If it was a bug, please don't fix it.I try to fix it myself.Thanks:) -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org