Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22271205
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master 
failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets 
shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    --- End diff --
    
    I factored some of the common code out here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to