[ 
https://issues.apache.org/jira/browse/SPARK-20568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717095#comment-16717095
 ] 

ASF GitHub Bot commented on SPARK-20568:
----------------------------------------

gaborgsomogyi commented on a change in pull request #22952: [SPARK-20568][SS] 
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r240573844
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1494,6 +1512,287 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
       newSource.getBatch(None, FileStreamSourceOffset(1))
     }
   }
+
+  test("remove completed files when remove option is enabled") {
+    def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+      assert(!files.exists(_.startsWith(fileName)))
+    }
+
+    def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = 
{
+      assert(files.exists(_.startsWith(fileName)))
+    }
+
+    withTempDirs { case (src, tmp) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+        // Force deleting the old logs
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+      ) {
+        val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+          "cleanSource" -> "delete")
+
+        val fileStream = createFileStream("text", src.getCanonicalPath, 
options = option)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        testStream(filtered)(
+          AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+          CheckAnswer("keep1"),
+          AssertOnQuery("input file removed") { _: StreamExecution =>
+            // it doesn't rename any file yet
+            assertFileIsNotRemoved(src.list(), "keep1")
+            true
+          },
+          AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+          CheckAnswer("keep1", "keep2"),
+          AssertOnQuery("input file removed") { _: StreamExecution =>
+            val files = src.list()
+
+            // it renames input file for first batch, but not for second batch 
yet
+            assertFileIsRemoved(files, "keep1")
+            assertFileIsNotRemoved(files, "ke ep2 %")
+
+            true
+          },
+          AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+          CheckAnswer("keep1", "keep2", "keep3"),
+          AssertOnQuery("input file renamed") { _: StreamExecution =>
+            val files = src.list()
+
+            // it renames input file for second batch, but not third batch yet
+            assertFileIsRemoved(files, "ke ep2 %")
+            assertFileIsNotRemoved(files, "keep3")
+
+            true
+          }
+        )
+      }
+    }
+  }
+
+  test("move completed files to archive directory when archive option is 
enabled") {
+
+    withThreeTempDirs { case (src, tmp, archiveDir) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+        // Force deleting the old logs
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+      ) {
+        val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+          "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
+
+        val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*",
+          options = option)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        // src/k %1
+        // file: src/k1 %1/keep1
+        val dirForKeep1 = new File(src, "k %1")
+        // src/k1/k 2
+        // file: src/k1/k 2/keep2
+        val dirForKeep2 = new File(dirForKeep1, "k 2")
+        // src/k3
+        // file: src/k3/keep3
+        val dirForKeep3 = new File(src, "k3")
+
+        val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + 
dirForKeep1.toURI.getPath)
+        val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + 
dirForKeep2.toURI.getPath)
+        val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + 
dirForKeep3.toURI.getPath)
+
+        testStream(filtered)(
+          AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+          CheckAnswer("keep1"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it doesn't rename any file yet
+            assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+            true
+          },
+          AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 
%"),
+          CheckAnswer("keep1", "keep2"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for first batch, but not for second batch 
yet
+            assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+            assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+            true
+          },
+          AddTextFileData("keep3", dirForKeep3, tmp, tmpFilePrefix = "keep3"),
+          CheckAnswer("keep1", "keep2", "keep3"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for second batch, but not third batch yet
+            assertFileIsMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+            assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep3")
+
+            true
+          },
+          AddTextFileData("keep4", dirForKeep3, tmp, tmpFilePrefix = "keep4"),
+          CheckAnswer("keep1", "keep2", "keep3", "keep4"),
+          AssertOnQuery("input file archived") { _: StreamExecution =>
+            // it renames input file for third batch, but not fourth batch yet
+            assertFileIsMoved(dirForKeep3, expectedMovedDir3, "keep3")
+            assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep4")
+
+            true
+          }
+        )
+      }
+    }
+  }
+
+  class FakeFileSystem extends FileSystem {
+    val requestsExists = new mutable.MutableList[Path]()
+    val requestsMkdirs = new mutable.MutableList[Path]()
+    val requestsRename = new mutable.MutableList[(Path, Path)]()
+
+    override def exists(f: Path): Boolean = {
+      requestsExists += f
+      true
+    }
+
+    override def mkdirs(f: Path, permission: FsPermission): Boolean = {
+      requestsMkdirs += f
+      true
+    }
+
+    override def rename(src: Path, dst: Path): Boolean = {
+      requestsRename += ((src, dst))
+      true
+    }
+
+    def clearRecords(): Unit = {
+      requestsExists.clear()
+      requestsMkdirs.clear()
+      requestsRename.clear()
+    }
+
+    override def getUri: URI = throw new NotImplementedError
+
+    override def open(f: Path, bufferSize: Int): FSDataInputStream = throw new 
NotImplementedError
+
+    override def create(
+        f: Path,
+        permission: FsPermission,
+        overwrite: Boolean,
+        bufferSize: Int,
+        replication: Short,
+        blockSize: Long,
+        progress: Progressable): FSDataOutputStream = throw new 
NotImplementedError
+
+    override def append(f: Path, bufferSize: Int, progress: Progressable): 
FSDataOutputStream =
+      throw new NotImplementedError
+
+    override def delete(f: Path, recursive: Boolean): Boolean = throw new 
NotImplementedError
+
+    override def listStatus(f: Path): Array[FileStatus] = throw new 
NotImplementedError
+
+    override def setWorkingDirectory(new_dir: Path): Unit = throw new 
NotImplementedError
+
+    override def getWorkingDirectory: Path = throw new NotImplementedError
+
+    override def getFileStatus(f: Path): FileStatus = throw new 
NotImplementedError
+  }
+
+  test("FileStreamSourceCleaner - archive - destinations match against source 
pattern") {
+
+    def assertNoMove(fs: FakeFileSystem, sourcePath: Path, 
expectedArchivePath: Path): Unit = {
+      assert(fs.requestsExists.isEmpty)
+      assert(fs.requestsMkdirs.isEmpty)
+      assert(fs.requestsRename.isEmpty)
+    }
+
+    def assertMoveFile(fs: FakeFileSystem, sourcePath: Path, 
expectedArchivePath: Path): Unit = {
+      assert(fs.requestsExists.nonEmpty)
+      assert(fs.requestsExists.head === expectedArchivePath.getParent)
+      assert(fs.requestsMkdirs.isEmpty)
+      assert(fs.requestsRename.nonEmpty)
+      assert(fs.requestsRename.head === ((sourcePath, expectedArchivePath)))
+    }
+
+    // source pattern: /hello*/h{e,f}ll?
 
 Review comment:
   Nit: I think it's redundant since the values are couple of lines below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delete files after processing in structured streaming
> -----------------------------------------------------
>
>                 Key: SPARK-20568
>                 URL: https://issues.apache.org/jira/browse/SPARK-20568
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.2.1
>            Reporter: Saul Shanabrook
>            Priority: Major
>
> It would be great to be able to delete files after processing them with 
> structured streaming.
> For example, I am reading in a bunch of JSON files and converting them into 
> Parquet. If the JSON files are not deleted after they are processed, it 
> quickly fills up my hard drive. I originally [posted this on Stack 
> Overflow|http://stackoverflow.com/q/43671757/907060] and was recommended to 
> make a feature request for it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to