Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14803#discussion_r85860069
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
    @@ -608,6 +614,81 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
     
       // =============== other tests ================
     
    +  test("read new files in partitioned table without globbing, should read 
partition data") {
    +    withTempDirs { case (dir, tmp) =>
    +      val partitionFooSubDir = new File(dir, "partition=foo")
    +      val partitionBarSubDir = new File(dir, "partition=bar")
    +
    +      val schema = new StructType().add("value", 
StringType).add("partition", StringType)
    +      val fileStream = createFileStream("json", 
s"${dir.getCanonicalPath}", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      testStream(filtered)(
    +        // Create new partition=foo sub dir and write to it
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", 
partitionFooSubDir, tmp),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // Append to same partition=foo sub dir
    +        AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // Create new partition sub dir and write to it
    +        AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // Append to same partition=bar sub dir
    +        AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), 
("keep5", "bar"))
    +      )
    +    }
    +  }
    +
    +  test("when schema inference is turned on, should read partition data") {
    +    def createFile(content: String, src: File, tmp: File): Unit = {
    +      val tempFile = Utils.tempFileWith(new File(tmp, "text"))
    +      val finalFile = new File(src, tempFile.getName)
    +      src.mkdirs()
    +      require(stringToFile(tempFile, content).renameTo(finalFile))
    +    }
    +
    +    withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
    +      withTempDirs { case (dir, tmp) =>
    +        val partitionFooSubDir = new File(dir, "partition=foo")
    +        val partitionBarSubDir = new File(dir, "partition=bar")
    +
    +        // Create file in partition, so we can infer the schema.
    +        createFile("{'value': 'drop0'}", partitionFooSubDir, tmp)
    +
    +        val fileStream = createFileStream("json", 
s"${dir.getCanonicalPath}")
    +        val filtered = fileStream.filter($"value" contains "keep")
    +        testStream(filtered)(
    +          // Append to same partition=foo sub dir
    +          AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", 
partitionFooSubDir, tmp),
    +          CheckAnswer(("keep2", "foo")),
    +
    +          // Append to same partition=foo sub dir
    +          AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
    +          CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +          // Create new partition sub dir and write to it
    +          AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
    +          CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", 
"bar")),
    +
    +          // Append to same partition=bar sub dir
    +          AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
    +          CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", 
"bar"), ("keep5", "bar")),
    +
    +          // Delete the two partition dirs
    +          DeleteFile(partitionFooSubDir),
    --- End diff --
    
    @zsxwing I remember it is used to simulate the partition is deleted and 
re-inserted data. Thanks for fixing this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to