Wesley Miao created SPARK-7326:
----------------------------------

             Summary: Performing window() on a WindowedDStream doesn't work all 
the time
                 Key: SPARK-7326
                 URL: https://issues.apache.org/jira/browse/SPARK-7326
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.3.1
            Reporter: Wesley Miao


Someone reported similar issues before but got no response.
http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html

And I met the same issue recently and it can be reproduced in 1.3.1 by the 
following piece of code:

def main(args: Array[String]) {

    val batchInterval = "1234"
    val sparkConf = new SparkConf()
      .setAppName("WindowOnWindowedDStream")
      .setMaster("local[2]")

    val ssc =  new StreamingContext(sparkConf, 
Milliseconds(batchInterval.toInt))
    ssc.checkpoint("checkpoint")

    def createRDD(i: Int) : RDD[(String, Int)] = {

      val count = 1000
      val rawLogs = (1 to count).map{ _ =>
        val word = "word" + Random.nextInt.abs % 5
        (word, 1)
      }
      ssc.sparkContext.parallelize(rawLogs)
    }

    val rddQueue = mutable.Queue[RDD[(String, Int)]]()
    val rawLogStream = ssc.queueStream(rddQueue)

    (1 to 300) foreach { i =>
      rddQueue.enqueue(createRDD(i))
    }

    val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, 
Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _)

    val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, 
Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _)

    l1.print()
    l2.print()

    ssc.start()
    ssc.awaitTermination()
}

Here we have two windowed DStream instance l1 and l2. 

l1 is the result DStream by performing a window() on the source DStream with 
both window and sliding duration 5 times the batch internal of the source 
stream.

l2 is the result DStream by performing a window() on l1, with both window and 
sliding duration 3 times l1's batch interval, which is 15 times of the source 
stream.

>From the output of this simple streaming app, I can only see print data output 
>from l1 and no data printed from l2.

Diving into the source code, I found the problem may most likely reside in 
DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of 
slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration 
("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result 
(alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be 
multiple of the slidingDuration, thus making isTimeValid check failed for all 
the remaining computation.

The fix would be to add a new floor() function in Time.scala to respect the 
zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + 
zeroTime.milliseconds)
  } 

And then change the DStream.slice to call this new floor function by passing in 
its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect 
to zeroTime whose value is not really a 0.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to