HeartSaVioR commented on a change in pull request #27620: URL: https://github.com/apache/spark/pull/27620#discussion_r411779675
########## File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ########## @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(expectedDir.exists()) assert(expectedDir.list().exists(_.startsWith(filePrefix))) } + + private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = { + val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl" + val originClassForLocalFileSystem = spark.conf.getOption(optionKey) + try { + spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName) + body + } finally { + originClassForLocalFileSystem match { + case Some(fsClazz) => spark.conf.set(optionKey, fsClazz) + case _ => spark.conf.unset(optionKey) + } + } + } + + test("Caches and leverages unread files") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + def verifyBatch( + offset: FileStreamSourceOffset, + expectedBatchId: Long, + inputFiles: Seq[File], + expectedListingCount: Int): Unit = { + val batchId = offset.logOffset + assert(batchId === expectedBatchId) + + val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) + assert(files.forall(_.batchId == batchId)) + + val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } + val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5) + .map(_.getCanonicalPath) + assert(actualInputFiles === expectedInputFiles) + + assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + var lastModified = 0 + val inputFiles = (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(lastModified) + lastModified += 10000 + f + } + + // 4 batches will be available for 20 input files + (0 to 3).foreach { batchId => + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1) + } + + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + // latestOffset returns the offset for previous batch which means no new batch is presented + assert(3 === offsetBatch.logOffset) + // listing should be performed after the list of unread files are exhausted + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + + test("Don't cache unread files when latestFirst is true") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + var lastModified = 0 + (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(lastModified) + lastModified += 10000 + f + } + + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled Review comment: I'm not sure we want to verify whole behavior of file stream source in this PR. This test only makes sure the call of listing input directory (and input files as well) is expected, other checks are redundant and error-prone. E.g. Suppose file stream source employs some changes to read side due to some changes, then this test will fail unintentionally. EDIT: it might be true for input files as well, but that may be the one of important things we may want to watch. (And we checked it in other test I've added.) Other paths are not that important relatively. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org