Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22491068
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -319,102 +318,141 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master 
failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets 
shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we 
wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before 
restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
    +      val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files 
that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a 
checkpoint
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // Make value 3 take a large time to process, to ensure that the 
driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from 
driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, 
batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, 
outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        // Advance half a batch so that the first file is created after 
the StreamingContext starts
    +        clock.addToTime(batchDuration.milliseconds / 2)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            eventually(timeout(batchDuration * 5)) {
    +              assert(waiter.getNumCompletedBatches === i)
    +            }
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        eventually(timeout(batchDuration * 5)) {
    +          assert(waiter.getNumStartedBatches === 3)
    +        }
    +        assert(waiter.getNumCompletedBatches === 2)
    +        logInfo("Output after first start = " + 
outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed before 
restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after 
restart")
    -    ssc.stop()
    +      // The original StreamingContext has now been stopped.
     
    -    // Verify whether files created while the driver was down have been 
recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded 
or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = 
ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) 
was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly 
added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not 
found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
    +
    +      // Recover context from checkpoint file and verify whether the files 
that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear 
to have time-traveled
    +        clock = {
    +          val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +          newClock.setTime(clock.currentTime())
    +          newClock
    +        }
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val outputStream = 
ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    +        // Check that we remember files that were recorded before the 
restart
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +
    +        // Restart stream computation
    +        ssc.start()
    +        clock.addToTime(batchDuration.milliseconds)
    +        for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          eventually(timeout(batchDuration * 5)) {
    +            assert(waiter.getNumCompletedBatches === index + 1)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        logInfo("Output after restart = " + 
outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after 
restart")
    +        ssc.stop()
    +
    +        // Verify whether files created while the driver was down (4, 5, 
6) and files created after
    +        // recovery (7, 8, 9) have been recorded
    +        assert(recordedFiles(ssc) === (1 to 9))
    +
    +        // Append the new output to the old buffer
    +        outputBuffer ++= outputStream.output
    +
    +        val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    +        logInfo("--------------------------------")
    +        logInfo(s"output, size = ${outputBuffer.size}")
    +        outputBuffer.foreach(x => logInfo(s"[${x.mkString(",")}]"))
    +        logInfo(s"expected output, size = ${expectedOutput.size}")
    +        expectedOutput.foreach(x => logInfo(s"[$x]"))
    +        logInfo("--------------------------------")
    +
    +        // Verify whether all the elements received are as expected
    +        val output = outputBuffer.flatMap(x => x)
    +        assert(output.contains(6))  // To ensure that the 3rd input (i.e., 
3) was processed
    +        output.foreach(o =>         // To ensure all the inputs are 
correctly added cumulatively
    --- End diff --
    
    Can use `toSet` and set comparison methods here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to