Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12752#discussion_r62682896
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         testFileStream(newFilesOnly = false)
       }
     
    +  test("file input stream - wildcard") {
    +    var testDir: File = null
    +    try {
    +      val batchDuration = Seconds(2)
    +      val testDir = Utils.createTempDir()
    +      val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
    +      val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
    +
    +      // Create a file that exists before the StreamingContext is created:
    +      val existingFile = new File(testDir, "0")
    +      Files.write("0\n", existingFile, StandardCharsets.UTF_8)
    +      assert(existingFile.setLastModified(10000) && 
existingFile.lastModified === 10000)
    +
    +      val pathWithWildCard = testDir.toString + "/*/"
    +
    +      // Set up the streaming context and input streams
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
    +        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        clock.setTime(existingFile.lastModified + 
batchDuration.milliseconds)
    +        val batchCounter = new BatchCounter(ssc)
    +        // monitor "testDir/*/"
    +        val fileStream = ssc.fileStream[LongWritable, Text, 
TextInputFormat](
    +          pathWithWildCard).map(_._2.toString)
    +        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
    +        val outputStream = new TestOutputStream(fileStream, outputQueue)
    +        outputStream.register()
    +        ssc.start()
    +
    +        // Advance the clock so that the files are created after 
StreamingContext starts, but
    +        // not enough to trigger a batch
    +        clock.advance(batchDuration.milliseconds / 2)
    +
    +        def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
    +          val file = new File(testSubDir1, data.toString)
    +          Files.write(data + "\n", file, StandardCharsets.UTF_8)
    +          assert(file.setLastModified(clock.getTimeMillis()))
    +          assert(file.lastModified === clock.getTimeMillis())
    +          logInfo("Created file " + file)
    +          // Advance the clock after creating the file to avoid a race when
    +          // setting its modification time
    +          clock.advance(batchDuration.milliseconds)
    +          eventually(eventuallyTimeout) {
    +            assert(batchCounter.getNumCompletedBatches === data)
    +          }
    +        }
    +        // Over time, create files in the temp directory 1
    +        val input1 = Seq(1, 2, 3, 4, 5)
    +        input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1 ))
    --- End diff --
    
    Nit: remove space inside final parent on this and a line below


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to