Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12752#discussion_r62682896 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala --- @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } + test("file input stream - wildcard") { + var testDir: File = null + try { + val batchDuration = Seconds(2) + val testDir = Utils.createTempDir() + val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") + val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") + + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, StandardCharsets.UTF_8) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) + + val pathWithWildCard = testDir.toString + "/*/" + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + // monitor "testDir/*/" + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + pathWithWildCard).map(_._2.toString) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.advance(batchDuration.milliseconds / 2) + + def createFileAndAdvenceTime(data: Int, dir: File): Unit = { + val file = new File(testSubDir1, data.toString) + Files.write(data + "\n", file, StandardCharsets.UTF_8) + assert(file.setLastModified(clock.getTimeMillis())) + assert(file.lastModified === clock.getTimeMillis()) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.advance(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === data) + } + } + // Over time, create files in the temp directory 1 + val input1 = Seq(1, 2, 3, 4, 5) + input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1 )) --- End diff -- Nit: remove space inside final parent on this and a line below
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org