[ 
https://issues.apache.org/jira/browse/SPARK-21206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062284#comment-16062284
 ] 

Fei Shao edited comment on SPARK-21206 at 7/1/17 8:38 AM:
----------------------------------------------------------

[~srowen]

I am sorry, I did not give enough message about this issue.

For my test code:

lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => {     
《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds. 


===========log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)  《=== here, 
the old RDD  slices from 1498383077000  to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===========log end============

===========the code in ReducedWindowedDStream.scala begin============
  override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    val reduceF = reduceFunc
    val invReduceF = invReduceFunc

    val currentTime = validTime
    val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
      currentTime)
    val previousWindow = currentWindow - slideDuration

    logDebug("Window time = " + windowDuration)
    logDebug("Slide time = " + slideDuration)
    logDebug("Zero time = " + zeroTime)
    logDebug("Current window = " + currentWindow)
    logDebug("Previous window = " + previousWindow)

    //  _____________________________
    // |  previous window   _________|___________________
    // |___________________|       current window        |  --------------> Time
    //                     |_____________________________|
    //
    // |________ _________|          |________ _________|
    //          |                             |
    //          V                             V
    //       old RDDs                     new RDDs
    //

    // Get the RDDs of the reduced values in "old time steps"
    val oldRDDs =
      reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) 

    // Get the RDDs of the reduced values in "new time steps"
    val newRDDs =
      reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)
    logDebug("# new RDDs = " + newRDDs.size)

===========code in ReducedWindowedDStream.scala  end============

Image the below case(  The slideDuration is greater than windowDuration. ):
    //  ______________________
    // |  previous window                |       _________________________
    // |______________________|------|            current window            |  
--------------> Time
    //                                                       
|________________________|
    //
    // _______  _______________|      |________ ________________|
    //               |                                                       |
    //               V                                                     V
    //             old RDDs                                   new RDDs
        
I think we can change expressions about oldRDDs and newRDDs to fix this issue.



was (Author: robin shao):
[~srowen]

I am sorry, I did not give enough message about this issue.

For my test code:

lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => {     
《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds. 


===========log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)  《=== here, 
the old RDD  slices from 1498383077000  to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===========log end============

===========the code in ReducedWindowedDStream.scala begin============
  override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    val reduceF = reduceFunc
    val invReduceF = invReduceFunc

    val currentTime = validTime
    val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
      currentTime)
    val previousWindow = currentWindow - slideDuration

    logDebug("Window time = " + windowDuration)
    logDebug("Slide time = " + slideDuration)
    logDebug("Zero time = " + zeroTime)
    logDebug("Current window = " + currentWindow)
    logDebug("Previous window = " + previousWindow)

    //  _____________________________
    // |  previous window   _________|___________________
    // |___________________|       current window        |  --------------> Time
    //                     |_____________________________|
    //
    // |________ _________|          |________ _________|
    //          |                             |
    //          V                             V
    //       old RDDs                     new RDDs
    //

    // Get the RDDs of the reduced values in "old time steps"
    val oldRDDs =
      reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) 

    // Get the RDDs of the reduced values in "new time steps"
    val newRDDs =
      reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)
    logDebug("# new RDDs = " + newRDDs.size)

===========code in ReducedWindowedDStream.scala  end============

Image the below case(  The slideDuration is greater than windowDuration. ):
    //  _____________________________
    // |  previous window            |       ____________________________
    // |_____________________________|------|            current window  |  
--------------> Time
    //                                      |____________________________|
    //
    // |____________  _______________|      |________ ___________________|
    //               |                                    |
    //               V                                    V
    //             old RDDs                         new RDDs
        
I think we can change expressions about oldRDDs and newRDDs to fix this issue.


> the window slice of Dstream is wrong
> ------------------------------------
>
>                 Key: SPARK-21206
>                 URL: https://issues.apache.org/jira/browse/SPARK-21206
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Fei Shao
>
> the code is :
>     val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]")
>     val ssc = new StreamingContext(conf, Seconds(1))
>     ssc.checkpoint( "path")
>     val lines = ssc.socketTextStream("IP", PORT)
>     lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => {
>       println( "RDD ID IS : " + s.id)
>       s.foreach( e => println("data is " + e._1 + " :" + e._2))
>       println()
>     })
> The result is wrong. 
> I checked the log, it showed:
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = 
> [1498383085000 ms, 1498383086000 ms]
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
> [1498383077000 ms, 1498383078000 ms]
> 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
> 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
> 17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as 
> zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
> 17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
> 17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
> the slice time is wrong.
> [BTW]: Team members,
> If it was a bug, please don't fix it.I try to fix it myself.Thanks:)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to