Repository: spark
Updated Branches:
  refs/heads/master 451546aa6 -> a6394bc2c


[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and 
SystemClock usage

This patch refactors Spark Streaming's FileInputStream tests to remove uses of 
Thread.sleep() and SystemClock, which should hopefully resolve some 
longstanding flakiness in these tests (see SPARK-1600).

Key changes:

- Modify FileInputDStream to use the scheduler's Clock instead of 
System.currentTimeMillis(); this allows it to be tested using ManualClock.
- Fix a synchronization issue in ManualClock's `currentTime` method.
- Add a StreamingTestWaiter class which allows callers to block until a certain 
number of batches have finished.
- Change the FileInputStream tests so that files' modification times are 
manually set based off of ManualClock; this eliminates many Thread.sleep calls.
- Update these tests to use the withStreamingContext fixture.

Author: Josh Rosen <joshro...@databricks.com>

Closes #3801 from JoshRosen/SPARK-1600 and squashes the following commits:

e4494f4 [Josh Rosen] Address a potential race when setting file modification 
times
8340bd0 [Josh Rosen] Use set comparisons for output.
0b9c252 [Josh Rosen] Fix some ManualClock usage problems.
1cc689f [Josh Rosen] ConcurrentHashMap -> SynchronizedMap
db26c3a [Josh Rosen] Use standard timeout in ScalaTest `eventually` blocks.
3939432 [Josh Rosen] Rename StreamingTestWaiter to BatchCounter
0b9c3a1 [Josh Rosen] Wait for checkpoint to complete
863d71a [Josh Rosen] Remove Thread.sleep that was used to make task run slowly
b4442c3 [Josh Rosen] batchTimeToSelectedFiles should be thread-safe
15b48ee [Josh Rosen] Replace several TestWaiter methods w/ ScalaTest eventually.
fffc51c [Josh Rosen] Revert "Remove last remaining sleep() call"
dbb8247 [Josh Rosen] Remove last remaining sleep() call
566a63f [Josh Rosen] Fix log message and comment typos
da32f3f [Josh Rosen] Fix log message and comment typos
3689214 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
SPARK-1600
c8f06b1 [Josh Rosen] Remove Thread.sleep calls in FileInputStream 
CheckpointSuite test.
d4f2d87 [Josh Rosen] Refactor file input stream tests to not rely on 
SystemClock.
dda1403 [Josh Rosen] Add StreamingTestWaiter class.
3c3efc3 [Josh Rosen] Synchronize `currentTime` in ManualClock
a95ddc4 [Josh Rosen] Modify FileInputDStream to use Clock class.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6394bc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6394bc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6394bc2

Branch: refs/heads/master
Commit: a6394bc2c094c6c662237236c2effa2dabe67910
Parents: 451546a
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Jan 6 00:31:19 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jan 6 00:31:19 2015 -0800

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    |  16 +-
 .../org/apache/spark/streaming/util/Clock.scala |   6 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   2 +-
 .../spark/streaming/CheckpointSuite.scala       | 248 +++++++++++--------
 .../spark/streaming/InputStreamsSuite.scala     |  69 +++---
 .../apache/spark/streaming/TestSuiteBase.scala  |  46 +++-
 6 files changed, 251 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 5f13fdc..e7c5639 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
     newFilesOnly: Boolean = true)
   extends InputDStream[(K, V)](ssc_) {
 
+  // This is a def so that it works during checkpoint recovery:
+  private def clock = ssc.scheduler.clock
+
   // Data to be saved as part of the streaming checkpoints
   protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
 
   // Initial ignore threshold based on which old, existing files in the 
directory (at the time of
   // starting the streaming application) will be ignored or considered
-  private val initialModTimeIgnoreThreshold = if (newFilesOnly) 
System.currentTimeMillis() else 0L
+  private val initialModTimeIgnoreThreshold = if (newFilesOnly) 
clock.currentTime() else 0L
 
   /*
    * Make sure that the information of files selected in the last few batches 
are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
   remember(durationToRemember)
 
   // Map of batch-time to selected file info for the remembered batches
+  // This is a concurrent map because it's also accessed in unit tests
   @transient private[streaming] var batchTimeToSelectedFiles =
-    new mutable.HashMap[Time, Array[String]]
+    new mutable.HashMap[Time, Array[String]] with 
mutable.SynchronizedMap[Time, Array[String]]
 
   // Set of files that were selected in the remembered batches
   @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
    */
   private def findNewFiles(currentTime: Long): Array[String] = {
     try {
-      lastNewFileFindingTime = System.currentTimeMillis
+      lastNewFileFindingTime = clock.currentTime()
 
       // Calculate ignore threshold
       val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
         def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
       }
       val newFiles = fs.listStatus(directoryPath, 
filter).map(_.getPath.toString)
-      val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
+      val timeTaken = clock.currentTime() - lastNewFileFindingTime
       logInfo("Finding new files took " + timeTaken + " ms")
       logDebug("# cached file times = " + fileToModTime.size)
       if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
-    batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
+    batchTimeToSelectedFiles =
+      new mutable.HashMap[Time, Array[String]] with 
mutable.SynchronizedMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()
     fileToModTime = new TimeStampedHashMap[String, Long](true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index 7cd867c..d6d96d7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
 private[streaming]
 class ManualClock() extends Clock {
 
-  var time = 0L
+  private var time = 0L
 
-  def currentTime() = time
+  def currentTime() = this.synchronized {
+    time
+  }
 
   def setTime(timeToSet: Long) = {
     this.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 199f5e7..e8f4a77 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       if (rememberDuration != null) ssc.remember(rememberDuration)
       val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, 
numExpectedOutput)
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      assert(clock.time === Seconds(10).milliseconds)
+      assert(clock.currentTime() === Seconds(10).milliseconds)
       assert(output.size === numExpectedOutput)
       operatedStream
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 5d232c6..8f8bc61 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,17 +18,18 @@
 package org.apache.spark.streaming
 
 import java.io.File
-import java.nio.charset.Charset
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import scala.reflect.ClassTag
 
+import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
@@ -45,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
 
   override def batchDuration = Milliseconds(500)
 
-  override def actuallyWait = true // to allow checkpoints to be written
-
   override def beforeFunction() {
     super.beforeFunction()
     Utils.deleteRecursively(new File(checkpointDir))
@@ -143,7 +142,6 @@ class CheckpointSuite extends TestSuiteBase {
     ssc.start()
     advanceTimeWithRealDelay(ssc, 4)
     ssc.stop()
-    System.clearProperty("spark.streaming.manualClock.jump")
     ssc = null
   }
 
@@ -312,109 +310,161 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
-
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window 
operation.
   // It also tests whether batches, whose processing was incomplete due to the
   // failure, are re-processed or not.
   test("recovery with file input stream") {
     // Set up the streaming context and input streams
+    val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
     val testDir = Utils.createTempDir()
-    var ssc = new StreamingContext(master, framework, Seconds(1))
-    ssc.checkpoint(checkpointDir)
-    val fileStream = ssc.textFileStream(testDir.toString)
-    // Making value 3 take large time to process, to ensure that the master
-    // shuts down in the middle of processing the 3rd batch
-    val mappedStream = fileStream.map(s => {
-      val i = s.toInt
-      if (i == 3) Thread.sleep(2000)
-      i
-    })
-
-    // Reducing over a large window to ensure that recovery from master failure
-    // requires reprocessing of all the files seen before the failure
-    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-    val outputBuffer = new ArrayBuffer[Seq[Int]]
-    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-    outputStream.register()
-    ssc.start()
-
-    // Create files and advance manual clock to process them
-    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    Thread.sleep(1000)
-    for (i <- Seq(1, 2, 3)) {
-      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-      // wait to make sure that the file is written such that it gets shown in 
the file listings
-      Thread.sleep(1000)
+    val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+    /**
+     * Writes a file named `i` (which contains the number `i`) to the test 
directory and sets its
+     * modification time to `clock`'s current time.
+     */
+    def writeFile(i: Int, clock: ManualClock): Unit = {
+      val file = new File(testDir, i.toString)
+      Files.write(i + "\n", file, Charsets.UTF_8)
+      assert(file.setLastModified(clock.currentTime()))
+      // Check that the file's modification date is actually the value we 
wrote, since rounding or
+      // truncation will break the test:
+      assert(file.lastModified() === clock.currentTime())
     }
-    logInfo("Output = " + outputStream.output.mkString(","))
-    assert(outputStream.output.size > 0, "No files processed before restart")
-    ssc.stop()
 
-    // Verify whether files created have been recorded correctly or not
-    var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-    // Create files while the master is down
-    for (i <- Seq(4, 5, 6)) {
-      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-      Thread.sleep(1000)
+    /**
+     * Returns ids that identify which files which have been recorded by the 
file input stream.
+     */
+    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+      val fileInputDStream =
+        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, 
_]]
+      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
     }
 
-    // Recover context from checkpoint file and verify whether the files that 
were
-    // recorded before failure were saved and successfully recovered
-    logInfo("*********** RESTARTING ************")
-    ssc = new StreamingContext(checkpointDir)
-    fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+    try {
+      // This is a var because it's re-assigned when we restart from a 
checkpoint
+      var clock: ManualClock = null
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        ssc.checkpoint(checkpointDir)
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.textFileStream(testDir.toString)
+        // Make value 3 take a large time to process, to ensure that the driver
+        // shuts down in the middle of processing the 3rd batch
+        CheckpointSuite.batchThreeShouldBlockIndefinitely = true
+        val mappedStream = fileStream.map(s => {
+          val i = s.toInt
+          if (i == 3) {
+            while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
+              Thread.sleep(Long.MaxValue)
+            }
+          }
+          i
+        })
+
+        // Reducing over a large window to ensure that recovery from driver 
failure
+        // requires reprocessing of all the files seen before the failure
+        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 
30, batchDuration)
+        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance half a batch so that the first file is created after the 
StreamingContext starts
+        clock.addToTime(batchDuration.milliseconds / 2)
+        // Create files and advance manual clock to process them
+        for (i <- Seq(1, 2, 3)) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          if (i != 3) {
+            // Since we want to shut down while the 3rd batch is processing
+            eventually(eventuallyTimeout) {
+              assert(batchCounter.getNumCompletedBatches === i)
+            }
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        eventually(eventuallyTimeout) {
+          // Wait until all files have been recorded and all batches have 
started
+          assert(recordedFiles(ssc) === Seq(1, 2, 3) && 
batchCounter.getNumStartedBatches === 3)
+        }
+        // Wait for a checkpoint to be written
+        val fs = new 
Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
+        eventually(eventuallyTimeout) {
+          assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+        }
+        ssc.stop()
+        // Check that we shut down while the third batch was being processed
+        assert(batchCounter.getNumCompletedBatches === 2)
+        assert(outputStream.output.flatten === Seq(1, 3))
+      }
 
-    // Restart stream computation
-    ssc.start()
-    for (i <- Seq(7, 8, 9)) {
-      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-      Thread.sleep(1000)
-    }
-    Thread.sleep(1000)
-    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
-    assert(outputStream.output.size > 0, "No files processed after restart")
-    ssc.stop()
+      // The original StreamingContext has now been stopped.
+      CheckpointSuite.batchThreeShouldBlockIndefinitely = false
 
-    // Verify whether files created while the driver was down have been 
recorded or not
-    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
-
-    // Verify whether new files created after recover have been recorded or not
-    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
-    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
-
-    // Append the new output to the old buffer
-    outputStream = 
ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
-    outputBuffer ++= outputStream.output
-
-    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
-    logInfo("--------------------------------")
-    logInfo("output, size = " + outputBuffer.size)
-    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-    logInfo("expected output, size = " + expectedOutput.size)
-    expectedOutput.foreach(x => logInfo("[" + x + "]"))
-    logInfo("--------------------------------")
-
-    // Verify whether all the elements received are as expected
-    val output = outputBuffer.flatMap(x => x)
-    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was 
processed
-    output.foreach(o =>         // To ensure all the inputs are correctly 
added cumulatively
-      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
-    )
-    // To ensure that all the inputs were received correctly
-    assert(expectedOutput.last === output.last)
-    Utils.deleteRecursively(testDir)
+      // Create files while the streaming driver is down
+      for (i <- Seq(4, 5, 6)) {
+        writeFile(i, clock)
+        // Advance the clock after creating the file to avoid a race when
+        // setting its modification time
+        clock.addToTime(batchDuration.milliseconds)
+      }
+
+      // Recover context from checkpoint file and verify whether the files 
that were
+      // recorded before failure were saved and successfully recovered
+      logInfo("*********** RESTARTING ************")
+      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
+        // So that the restarted StreamingContext's clock has gone forward in 
time since failure
+        ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 
3).milliseconds.toString)
+        val oldClockTime = clock.currentTime()
+        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val batchCounter = new BatchCounter(ssc)
+        val outputStream = 
ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+        // Check that we remember files that were recorded before the restart
+        assert(recordedFiles(ssc) === Seq(1, 2, 3))
+
+        // Restart stream computation
+        ssc.start()
+        // Verify that the clock has traveled forward to the expected time
+        eventually(eventuallyTimeout) {
+          clock.currentTime() === oldClockTime
+        }
+        // Wait for pre-failure batch to be recomputed (3 while SSC was down 
plus last batch)
+        val numBatchesAfterRestart = 4
+        eventually(eventuallyTimeout) {
+          assert(batchCounter.getNumCompletedBatches === 
numBatchesAfterRestart)
+        }
+        for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
+          writeFile(i, clock)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === index + 
numBatchesAfterRestart + 1)
+          }
+        }
+        clock.addToTime(batchDuration.milliseconds)
+        logInfo("Output after restart = " + outputStream.output.mkString("[", 
", ", "]"))
+        assert(outputStream.output.size > 0, "No files processed after 
restart")
+        ssc.stop()
+
+        // Verify whether files created while the driver was down (4, 5, 6) 
and files created after
+        // recovery (7, 8, 9) have been recorded
+        assert(recordedFiles(ssc) === (1 to 9))
+
+        // Append the new output to the old buffer
+        outputBuffer ++= outputStream.output
+
+        // Verify whether all the elements received are as expected
+        val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+        assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+      }
+    } finally {
+      Utils.deleteRecursively(testDir)
+    }
   }
 
 
@@ -471,12 +521,12 @@ class CheckpointSuite extends TestSuiteBase {
    */
   def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: 
Long): Seq[Seq[V]] = {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    logInfo("Manual clock before advancing = " + clock.time)
+    logInfo("Manual clock before advancing = " + clock.currentTime())
     for (i <- 1 to numBatches.toInt) {
       clock.addToTime(batchDuration.milliseconds)
       Thread.sleep(batchDuration.milliseconds)
     }
-    logInfo("Manual clock after advancing = " + clock.time)
+    logInfo("Manual clock after advancing = " + clock.currentTime())
     Thread.sleep(batchDuration.milliseconds)
 
     val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -485,3 +535,7 @@ class CheckpointSuite extends TestSuiteBase {
     outputStream.output.map(_.flatten)
   }
 }
+
+private object CheckpointSuite extends Serializable {
+  var batchThreeShouldBlockIndefinitely: Boolean = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 307052a..bddf51e 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, 
ArrayBlockingQueue}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, 
SynchronizedQueue}
-import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.Files
@@ -234,45 +233,57 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   }
 
   def testFileStream(newFilesOnly: Boolean) {
-    var ssc: StreamingContext = null
     val testDir: File = null
     try {
+      val batchDuration = Seconds(2)
       val testDir = Utils.createTempDir()
+      // Create a file that exists before the StreamingContext is created:
       val existingFile = new File(testDir, "0")
       Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified 
=== 10000)
 
-      Thread.sleep(1000)
       // Set up the streaming context and input streams
-      val newConf = conf.clone.set(
-        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-      ssc = new StreamingContext(newConf, batchDuration)
-      val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
-        testDir.toString, (x: Path) => true, newFilesOnly = 
newFilesOnly).map(_._2.toString)
-      val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
-      val outputStream = new TestOutputStream(fileStream, outputBuffer)
-      outputStream.register()
-      ssc.start()
-
-      // Create files in the directory
-      val input = Seq(1, 2, 3, 4, 5)
-      input.foreach { i =>
-        Thread.sleep(batchDuration.milliseconds)
-        val file = new File(testDir, i.toString)
-        Files.write(i + "\n", file, Charset.forName("UTF-8"))
-        logInfo("Created file " + file)
-      }
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        // This `setTime` call ensures that the clock is past the creation 
time of `existingFile`
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+          testDir.toString, (x: Path) => true, newFilesOnly = 
newFilesOnly).map(_._2.toString)
+        val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after 
StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.addToTime(batchDuration.milliseconds / 2)
+
+        // Over time, create files in the directory
+        val input = Seq(1, 2, 3, 4, 5)
+        input.foreach { i =>
+          val file = new File(testDir, i.toString)
+          Files.write(i + "\n", file, Charset.forName("UTF-8"))
+          assert(file.setLastModified(clock.currentTime()))
+          assert(file.lastModified === clock.currentTime)
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === i)
+          }
+        }
 
-      // Verify that all the files have been read
-      val expectedOutput = if (newFilesOnly) {
-        input.map(_.toString).toSet
-      } else {
-        (Seq(0) ++ input).map(_.toString).toSet
-      }
-      eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 
milliseconds)) {
+        // Verify that all the files have been read
+        val expectedOutput = if (newFilesOnly) {
+          input.map(_.toString).toSet
+        } else {
+          (Seq(0) ++ input).map(_.toString).toSet
+        }
         assert(outputBuffer.flatten.toSet === expectedOutput)
       }
     } finally {
-      if (ssc != null) ssc.stop()
       if (testDir != null) Utils.deleteRecursively(testDir)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6394bc2/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 52972f6..7d82c3e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -21,11 +21,16 @@ import java.io.{ObjectInputStream, IOException}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.SynchronizedBuffer
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
+import org.scalatest.concurrent.Eventually.timeout
+import org.scalatest.concurrent.PatienceConfiguration
 
 import org.apache.spark.streaming.dstream.{DStream, InputDStream, 
ForEachDStream}
+import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, 
StreamingListenerBatchCompleted, StreamingListener}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.rdd.RDD
@@ -104,6 +109,40 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: 
DStream[T],
 }
 
 /**
+ * An object that counts the number of started / completed batches. This is 
implemented using a
+ * StreamingListener. Constructing a new instance automatically registers a 
StreamingListener on
+ * the given StreamingContext.
+ */
+class BatchCounter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by 
`BatchCounter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
Unit =
+      BatchCounter.this.synchronized {
+        numStartedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+    override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit =
+      BatchCounter.this.synchronized {
+        numCompletedBatches += 1
+        BatchCounter.this.notifyAll()
+      }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+    numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+    numStartedBatches
+  }
+}
+
+/**
  * This is the base trait for Spark Streaming testsuites. This provides basic 
functionality
  * to run user-defined set of input on user-defined stream operations, and 
verify the output.
  */
@@ -142,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
     .setMaster(master)
     .setAppName(framework)
 
+  // Timeout for use in ScalaTest `eventually` blocks
+  val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, 
ScalaTestSeconds))
+
   // Default before function for any streaming test suite. Override this
   // if you want to add your stuff to "before" (i.e., don't call before { } )
   def beforeFunction() {
@@ -291,7 +333,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
 
       // Advance manual clock
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      logInfo("Manual clock before advancing = " + clock.time)
+      logInfo("Manual clock before advancing = " + clock.currentTime())
       if (actuallyWait) {
         for (i <- 1 to numBatches) {
           logInfo("Actually waiting for " + batchDuration)
@@ -301,7 +343,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
       } else {
         clock.addToTime(numBatches * batchDuration.milliseconds)
       }
-      logInfo("Manual clock after advancing = " + clock.time)
+      logInfo("Manual clock after advancing = " + clock.currentTime())
 
       // Wait until expected number of output items have been generated
       val startTime = System.currentTimeMillis()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to