Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/12616#discussion_r62739836 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + test("read new files in nested directories with globbing") { + withTempDirs { case (dir, tmp) => + + // src/*/* should consider all the files and directories that matches that glob. + // So any files that matches the glob as well as any files in directories that matches + // this glob should be read. + val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*") + val filtered = fileStream.filter($"value" contains "keep") + val subDir = new File(dir, "subdir") + val subSubDir = new File(subDir, "subsubdir") + val subSubSubDir = new File(subSubDir, "subsubsubdir") + + require(!subDir.exists()) + require(!subSubDir.exists()) + + testStream(filtered)( + // Create new dir/subdir and write to it, should read + AddTextFileData("drop1\nkeep2", subDir, tmp), + CheckAnswer("keep2"), + + // Add files to dir/subdir, should read + AddTextFileData("keep3", subDir, tmp), + CheckAnswer("keep2", "keep3"), + + // Create new dir/subdir/subsubdir and write to it, should read + AddTextFileData("keep4", subSubDir, tmp), + CheckAnswer("keep2", "keep3", "keep4"), + + // Add files to dir/subdir/subsubdir, should read + AddTextFileData("keep5", subSubDir, tmp), + CheckAnswer("keep2", "keep3", "keep4", "keep5"), + + // 1. Add file to src dir, should not read as globbing src/*/* does not capture files in + // dir, only captures files in dir/subdir/ + // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read as src/*/* should + // not capture those files + AddTextFileData("keep6", dir, tmp), + AddTextFileData("keep7", subSubSubDir, tmp), + AddTextFileData("keep8", subDir, tmp), // needed to make query detect new data + CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8") + ) + } + } + + test("read new files in partitioned table with globbing, should not read partition data") { + withTempDirs { case (dir, tmp) => + val partitionFooSubDir = new File(dir, "partition=foo") + val partitionBarSubDir = new File(dir, "partition=bar") + + val schema = new StructType().add("value", StringType).add("partition", StringType) + val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/*/*", Some(schema)) + val filtered = fileStream.filter($"value" contains "keep") + val nullStr = null.asInstanceOf[String] + testStream(filtered)( + // Create new partition=foo sub dir and write to it, should read only value, not partition + AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp), + CheckAnswer(("keep2", nullStr)), --- End diff -- why do we have a `nullStr` at here?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org