Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62739836
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
    @@ -444,6 +445,79 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest with SharedSQLContext {
         }
       }
     
    +  test("read new files in nested directories with globbing") {
    +    withTempDirs { case (dir, tmp) =>
    +
    +      // src/*/* should consider all the files and directories that 
matches that glob.
    +      // So any files that matches the glob as well as any files in 
directories that matches
    +      // this glob should be read.
    +      val fileStream = createFileStream("text", 
s"${dir.getCanonicalPath}/*/*")
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val subDir = new File(dir, "subdir")
    +      val subSubDir = new File(subDir, "subsubdir")
    +      val subSubSubDir = new File(subSubDir, "subsubsubdir")
    +
    +      require(!subDir.exists())
    +      require(!subSubDir.exists())
    +
    +      testStream(filtered)(
    +        // Create new dir/subdir and write to it, should read
    +        AddTextFileData("drop1\nkeep2", subDir, tmp),
    +        CheckAnswer("keep2"),
    +
    +        // Add files to dir/subdir, should read
    +        AddTextFileData("keep3", subDir, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +
    +        // Create new dir/subdir/subsubdir and write to it, should read
    +        AddTextFileData("keep4", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4"),
    +
    +        // Add files to dir/subdir/subsubdir, should read
    +        AddTextFileData("keep5", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5"),
    +
    +        // 1. Add file to src dir, should not read as globbing src/*/* 
does not capture files in
    +        //    dir, only captures files in dir/subdir/
    +        // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not 
read as src/*/* should
    +        //    not capture those files
    +        AddTextFileData("keep6", dir, tmp),
    +        AddTextFileData("keep7", subSubSubDir, tmp),
    +        AddTextFileData("keep8", subDir, tmp), // needed to make query 
detect new data
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
    +      )
    +    }
    +  }
    +
    +  test("read new files in partitioned table with globbing, should not read 
partition data") {
    +    withTempDirs { case (dir, tmp) =>
    +      val partitionFooSubDir = new File(dir, "partition=foo")
    +      val partitionBarSubDir = new File(dir, "partition=bar")
    +
    +      val schema = new StructType().add("value", 
StringType).add("partition", StringType)
    +      val fileStream = createFileStream("json", 
s"${dir.getCanonicalPath}/*/*", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val nullStr = null.asInstanceOf[String]
    +      testStream(filtered)(
    +        // Create new partition=foo sub dir and write to it, should read 
only value, not partition
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", 
partitionFooSubDir, tmp),
    +        CheckAnswer(("keep2", nullStr)),
    --- End diff --
    
    why do we have a `nullStr` at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to