This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25431d7  [SPARK-29953][SS] Don't clean up source files for 
FileStreamSource if the files belong to the output of FileStreamSink
25431d7 is described below

commit 25431d79f7daf2a68298701154eb505c2a4add80
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Dec 5 21:46:28 2019 -0800

    [SPARK-29953][SS] Don't clean up source files for FileStreamSource if the 
files belong to the output of FileStreamSink
    
    ### What changes were proposed in this pull request?
    
    This patch prevents the cleanup operation in FileStreamSource if the source 
files belong to the FileStreamSink. This is needed because the output of 
FileStreamSink can be read with multiple Spark queries and queries will read 
the files based on the metadata log, which won't reflect the cleanup.
    
    To simplify the logic, the patch only takes care of the case of when the 
source path without glob pattern refers to the output directory of 
FileStreamSink, via checking FileStreamSource to see whether it leverages 
metadata directory or not to list the source files.
    
    ### Why are the changes needed?
    
    Without this patch, if end users turn on cleanup option with the path which 
is the output of FileStreamSink, there may be out of sync between metadata and 
available files which may break other queries reading the path.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added UT.
    
    Closes #26590 from HeartSaVioR/SPARK-29953.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  2 +-
 .../sql/execution/streaming/FileStreamSource.scala | 17 ++++-
 .../sql/streaming/FileStreamSourceSuite.scala      | 83 +++++++++++++++++-----
 3 files changed, 81 insertions(+), 21 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 01679e5..b91b930 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
         When "archive" is provided, additional option 
<code>sourceArchiveDir</code> must be provided as well. The value of 
"sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater 
than 2). e.g. <code>/archived/here</code>. This will ensure archived files are 
never included as new source files.<br/>
         Spark will move source files respecting their own path. For example, 
if the path of source file is <code>/a/b/dataset.txt</code> and the path of 
archive directory is <code>/archived/here</code>, file will be moved to 
<code>/archived/here/a/b/dataset.txt</code>.<br/>
         NOTE: Both archiving (via moving) or deleting completed files will 
introduce overhead (slow down) in each micro-batch, so you need to understand 
the cost for each operation in your file system before enabling this option. On 
the other hand, enabling this option will reduce the cost to list source files 
which can be an expensive operation.<br/>
-        NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option.<br/>
+        NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option. Similarly, you must ensure the source path 
doesn't match to any files in output directory of file stream sink.<br/>
         NOTE 3: Both delete and move actions are best effort. Failing to 
delete or move files will not fail the streaming query.
         <br/><br/>
         For file-format-specific options, see the related methods in 
<code>DataStreamReader</code>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 35d486c..f31fb32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -206,6 +206,17 @@ class FileStreamSource(
       CaseInsensitiveMap(options), None).allFiles()
   }
 
+  private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue 
match {
+    case Some(true) =>
+      if (sourceCleaner.isDefined) {
+        throw new UnsupportedOperationException("Clean up source files is not 
supported when" +
+          " reading from the output directory of FileStreamSink.")
+      }
+      sourceHasMetadata = Some(true)
+    case _ =>
+      sourceHasMetadata = newValue
+  }
+
   /**
    * Returns a list of files found, sorted by their timestamp.
    */
@@ -216,7 +227,7 @@ class FileStreamSource(
     sourceHasMetadata match {
       case None =>
         if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, 
sparkSession.sessionState.conf)) {
-          sourceHasMetadata = Some(true)
+          setSourceHasMetadata(Some(true))
           allFiles = allFilesUsingMetadataLogFileIndex()
         } else {
           allFiles = allFilesUsingInMemoryFileIndex()
@@ -228,10 +239,10 @@ class FileStreamSource(
             // metadata log and data files are only generated after the 
previous
             // `FileStreamSink.hasMetadata` check
             if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, 
sparkSession.sessionState.conf)) {
-              sourceHasMetadata = Some(true)
+              setSourceHasMetadata(Some(true))
               allFiles = allFilesUsingMetadataLogFileIndex()
             } else {
-              sourceHasMetadata = Some(false)
+              setSourceHasMetadata(Some(false))
               // `allFiles` have already been fetched using InMemoryFileIndex 
in this round
             }
           }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 1ef0ae8..b8dac13 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable
 import org.scalatest.PrivateMethodTester
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
@@ -149,6 +150,20 @@ abstract class FileStreamSourceTest
     }
   }
 
+  case class AddFilesToFileStreamSinkLog(
+      fs: FileSystem,
+      srcDir: Path,
+      sinkLog: FileStreamSinkLog,
+      batchId: Int)(
+      pathFilter: Path => Boolean) extends ExternalAction {
+    override def runAction(): Unit = {
+      val statuses = fs.listStatus(srcDir, new PathFilter {
+        override def accept(path: Path): Boolean = pathFilter(path)
+      })
+      sinkLog.add(batchId, statuses.map(SinkFileStatus(_)))
+    }
+  }
+
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   def createFileStream(
       format: String,
@@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
   }
 
   test("remove completed files when remove option is enabled") {
-    def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
-      assert(!files.exists(_.startsWith(fileName)))
-    }
-
-    def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
-      assert(files.exists(_.startsWith(fileName)))
-    }
-
     withTempDirs { case (src, tmp) =>
       withSQLConf(
         SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
@@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
           CheckAnswer("keep1"),
           AssertOnQuery("input file removed") { _: StreamExecution =>
             // it doesn't rename any file yet
-            assertFileIsNotRemoved(src.list(), "keep1")
+            assertFileIsNotRemoved(src, "keep1")
             true
           },
           AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
           CheckAnswer("keep1", "keep2"),
           AssertOnQuery("input file removed") { _: StreamExecution =>
-            val files = src.list()
-
             // it renames input file for first batch, but not for second batch 
yet
-            assertFileIsRemoved(files, "keep1")
-            assertFileIsNotRemoved(files, "ke ep2 %")
+            assertFileIsRemoved(src, "keep1")
+            assertFileIsNotRemoved(src, "ke ep2 %")
 
             true
           },
           AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
           CheckAnswer("keep1", "keep2", "keep3"),
           AssertOnQuery("input file renamed") { _: StreamExecution =>
-            val files = src.list()
-
             // it renames input file for second batch, but not third batch yet
-            assertFileIsRemoved(files, "ke ep2 %")
-            assertFileIsNotRemoved(files, "keep3")
+            assertFileIsRemoved(src, "ke ep2 %")
+            assertFileIsNotRemoved(src, "keep3")
 
             true
           }
@@ -1739,6 +1742,44 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     }
   }
 
+  Seq("delete", "archive").foreach { cleanOption =>
+    test(s"Throw UnsupportedOperationException on configuring $cleanOption 
when source path" +
+      " refers the output dir of FileStreamSink") {
+      withThreeTempDirs { case (src, tmp, archiveDir) =>
+        withSQLConf(
+          SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+          // Force deleting the old logs
+          SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+        ) {
+          val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> 
"1",
+            "cleanSource" -> cleanOption, "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+          val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+          val filtered = fileStream.filter($"value" contains "keep")
+
+          // create FileStreamSinkLog under source directory
+          val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
+            new File(src, FileStreamSink.metadataDir).getCanonicalPath)
+          val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
+          val srcPath = new Path(src.getCanonicalPath)
+          val fileSystem = srcPath.getFileSystem(hadoopConf)
+
+          // Here we will just check whether the source file is removed or 
not, as we cover
+          // functionality test of "archive" in other UT.
+          testStream(filtered)(
+            AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+            AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { 
path =>
+              path.getName.startsWith("keep1")
+            },
+            ExpectFailure[UnsupportedOperationException](
+              t => assert(t.getMessage.startsWith("Clean up source files is 
not supported")),
+              isFatalError = false)
+          )
+        }
+      }
+    }
+  }
+
   class FakeFileSystem(scheme: String) extends FileSystem {
     override def exists(f: Path): Boolean = true
 
@@ -1797,6 +1838,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     }
   }
 
+  private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = {
+    assert(!sourceDir.list().exists(_.startsWith(fileName)))
+  }
+
+  private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit 
= {
+    assert(sourceDir.list().exists(_.startsWith(fileName)))
+  }
+
   private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, 
filePrefix: String): Unit = {
     assert(sourceDir.exists())
     assert(sourceDir.list().exists(_.startsWith(filePrefix)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to